blob: 413c0d8f17dc075024b530aa545e7dec782f8515 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
Richard Barnetteffed1722016-05-18 15:57:22 -07002
3#pylint: disable=C0111
4
mbligh36768f02008-02-22 18:28:33 +00005"""
6Autotest scheduler
7"""
showard909c7a62008-07-15 21:52:38 +00008
Dan Shif6c65bd2014-08-29 16:15:07 -07009import datetime
10import gc
11import logging
12import optparse
13import os
14import signal
15import sys
16import time
showard402934a2009-12-21 22:20:47 +000017
Alex Miller05d7b4c2013-03-04 07:49:38 -080018import common
showard21baa452008-10-21 00:08:39 +000019from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000020
21import django.db
Aviv Keshet65fed072016-06-29 10:20:55 -070022from chromite.lib import metrics
Richard Barnetteffed1722016-05-18 15:57:22 -070023from chromite.lib import ts_mon_config
showard402934a2009-12-21 22:20:47 +000024
Dan Shiec1d47d2015-02-13 11:38:13 -080025from autotest_lib.client.common_lib import control_data
Prashanth B0e960282014-05-13 19:38:28 -070026from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070027from autotest_lib.client.common_lib import utils
Gabe Black1e1c41b2015-02-04 23:55:15 -080028from autotest_lib.client.common_lib.cros.graphite import autotest_stats
Prashanth B0e960282014-05-13 19:38:28 -070029from autotest_lib.frontend.afe import models, rpc_utils
Fang Dengc330bee2014-10-21 18:10:55 -070030from autotest_lib.scheduler import agent_task, drone_manager
beeps5e2bb4a2013-10-28 11:26:45 -070031from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
32from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070033from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070034from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070035from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000036from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080037from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070038from autotest_lib.server import autoserv_utils
Dan Shi114e1722016-01-10 18:12:53 -080039from autotest_lib.server import system_utils
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080040from autotest_lib.server import utils as server_utils
Dan Shicf2e8dd2015-05-07 17:18:48 -070041from autotest_lib.site_utils import metadata_reporter
Dan Shib9144a42014-12-01 16:09:32 -080042from autotest_lib.site_utils import server_manager_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080043
Dan Shicf2e8dd2015-05-07 17:18:48 -070044
showard549afad2009-08-20 23:33:36 +000045BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
46PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000047
mbligh36768f02008-02-22 18:28:33 +000048RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000049AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
50
51if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000052 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000053AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
54AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
55
56if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000057 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000058
showard35162b02009-03-03 02:17:30 +000059# error message to leave in results dir when an autoserv process disappears
60# mysteriously
61_LOST_PROCESS_ERROR = """\
62Autoserv failed abnormally during execution for this job, probably due to a
63system error on the Autotest server. Full results may not be available. Sorry.
64"""
65
Prashanth B0e960282014-05-13 19:38:28 -070066_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070067_db = None
mbligh36768f02008-02-22 18:28:33 +000068_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070069
70# These 2 globals are replaced for testing
71_autoserv_directory = autoserv_utils.autoserv_directory
72_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000073_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000074_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070075_inline_host_acquisition = global_config.global_config.get_config_value(
76 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
77 default=True)
78
Dan Shiec1d47d2015-02-13 11:38:13 -080079_enable_ssp_container = global_config.global_config.get_config_value(
80 'AUTOSERV', 'enable_ssp_container', type=bool,
81 default=True)
mbligh36768f02008-02-22 18:28:33 +000082
mbligh83c1e9e2009-05-01 23:10:41 +000083def _site_init_monitor_db_dummy():
84 return {}
85
86
jamesren76fcf192010-04-21 20:39:50 +000087def _verify_default_drone_set_exists():
88 if (models.DroneSet.drone_sets_enabled() and
89 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070090 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080091 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000092
93
94def _sanity_check():
95 """Make sure the configs are consistent before starting the scheduler"""
96 _verify_default_drone_set_exists()
97
98
mbligh36768f02008-02-22 18:28:33 +000099def main():
showard27f33872009-04-07 18:20:53 +0000100 try:
showard549afad2009-08-20 23:33:36 +0000101 try:
102 main_without_exception_handling()
103 except SystemExit:
104 raise
105 except:
106 logging.exception('Exception escaping in monitor_db')
107 raise
108 finally:
109 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000110
111
112def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700113 scheduler_lib.setup_logging(
114 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
115 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000116 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000117 parser = optparse.OptionParser(usage)
118 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
119 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000120 parser.add_option('--test', help='Indicate that scheduler is under ' +
121 'test and should use dummy autoserv and no parsing',
122 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700123 parser.add_option('--production',
124 help=('Indicate that scheduler is running in production '
125 'environment and it can use database that is not '
126 'hosted in localhost. If it is set to False, '
127 'scheduler will fail if database is not in '
128 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700129 action='store_true', default=False)
jadmanski0afbb632008-06-06 21:10:57 +0000130 (options, args) = parser.parse_args()
131 if len(args) != 1:
132 parser.print_usage()
133 return
mbligh36768f02008-02-22 18:28:33 +0000134
Dan Shif6c65bd2014-08-29 16:15:07 -0700135 scheduler_lib.check_production_settings(options)
136
showard5613c662009-06-08 23:30:33 +0000137 scheduler_enabled = global_config.global_config.get_config_value(
138 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
139
140 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800141 logging.error("Scheduler not enabled, set enable_scheduler to true in "
142 "the global_config's SCHEDULER section to enable it. "
143 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000144 sys.exit(1)
145
jadmanski0afbb632008-06-06 21:10:57 +0000146 global RESULTS_DIR
147 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000148
mbligh83c1e9e2009-05-01 23:10:41 +0000149 site_init = utils.import_site_function(__file__,
150 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
151 _site_init_monitor_db_dummy)
152 site_init()
153
showardcca334f2009-03-12 20:38:34 +0000154 # Change the cwd while running to avoid issues incase we were launched from
155 # somewhere odd (such as a random NFS home directory of the person running
156 # sudo to launch us as the appropriate user).
157 os.chdir(RESULTS_DIR)
158
jamesrenc7d387e2010-08-10 21:48:30 +0000159 # This is helpful for debugging why stuff a scheduler launches is
160 # misbehaving.
161 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000162
jadmanski0afbb632008-06-06 21:10:57 +0000163 if options.test:
164 global _autoserv_path
165 _autoserv_path = 'autoserv_dummy'
166 global _testing_mode
167 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000168
jamesrenc44ae992010-02-19 00:12:54 +0000169 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000170 server.start()
171
Dan Shicf2e8dd2015-05-07 17:18:48 -0700172 # Start the thread to report metadata.
173 metadata_reporter.start()
174
Paul Hobbsabd3b052016-10-03 18:25:23 +0000175 ts_mon_config.SetupTsMonGlobalState('autotest_scheduler')
Richard Barnetteffed1722016-05-18 15:57:22 -0700176
Paul Hobbsabd3b052016-10-03 18:25:23 +0000177 try:
178 initialize()
179 dispatcher = Dispatcher()
180 dispatcher.initialize(recover_hosts=options.recover_hosts)
181 minimum_tick_sec = global_config.global_config.get_config_value(
182 scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
showardc5afc462009-01-13 00:09:39 +0000183
Paul Hobbsabd3b052016-10-03 18:25:23 +0000184 while not _shutdown and not server._shutdown_scheduler:
185 start = time.time()
186 dispatcher.tick()
187 curr_tick_sec = time.time() - start
188 if (minimum_tick_sec > curr_tick_sec):
189 time.sleep(minimum_tick_sec - curr_tick_sec)
190 else:
191 time.sleep(0.0001)
192 except server_manager_utils.ServerActionError as e:
193 # This error is expected when the server is not in primary status
194 # for scheduler role. Thus do not send email for it.
195 logging.exception(e)
196 except Exception:
197 email_manager.manager.log_stacktrace(
198 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000199
Paul Hobbsabd3b052016-10-03 18:25:23 +0000200 metadata_reporter.abort()
201 email_manager.manager.send_queued_emails()
202 server.shutdown()
203 _drone_manager.shutdown()
204 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000205
206
Prashanth B4ec98672014-05-15 10:44:54 -0700207def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000208 global _shutdown
209 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000210 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000211
212
jamesrenc44ae992010-02-19 00:12:54 +0000213def initialize():
showardb18134f2009-03-20 20:52:18 +0000214 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
215 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000216
showard8de37132009-08-31 18:33:08 +0000217 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000218 logging.critical("monitor_db already running, aborting!")
219 sys.exit(1)
220 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000221
showardb1e51872008-10-07 11:08:18 +0000222 if _testing_mode:
223 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700224 scheduler_lib.DB_CONFIG_SECTION, 'database',
225 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000226
Dan Shib9144a42014-12-01 16:09:32 -0800227 # If server database is enabled, check if the server has role `scheduler`.
228 # If the server does not have scheduler role, exception will be raised and
229 # scheduler will not continue to run.
230 if server_manager_utils.use_server_db():
231 server_manager_utils.confirm_server_has_role(hostname='localhost',
232 role='scheduler')
233
jadmanski0afbb632008-06-06 21:10:57 +0000234 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700235 global _db_manager
236 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700237 global _db
238 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000239 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700240 signal.signal(signal.SIGINT, handle_signal)
241 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000242
jamesrenc44ae992010-02-19 00:12:54 +0000243 initialize_globals()
244 scheduler_models.initialize()
245
Dan Shi114e1722016-01-10 18:12:53 -0800246 drone_list = system_utils.get_drones()
showard170873e2009-01-07 00:22:26 +0000247 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000248 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000249 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
250
showardb18134f2009-03-20 20:52:18 +0000251 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000252
253
jamesrenc44ae992010-02-19 00:12:54 +0000254def initialize_globals():
255 global _drone_manager
256 _drone_manager = drone_manager.instance()
257
258
showarded2afea2009-07-07 20:54:07 +0000259def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
260 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000261 """
262 @returns The autoserv command line as a list of executable + parameters.
263
264 @param machines - string - A machine or comma separated list of machines
265 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000266 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700267 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
268 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000269 @param queue_entry - A HostQueueEntry object - If supplied and no Job
270 object was supplied, this will be used to lookup the Job object.
271 """
Simran Basi1bf60eb2015-12-01 16:39:29 -0800272 command = autoserv_utils.autoserv_run_job_command(_autoserv_directory,
Aviv Keshet308e7362013-05-21 14:43:16 -0700273 machines, results_directory=drone_manager.WORKING_DIRECTORY,
274 extra_args=extra_args, job=job, queue_entry=queue_entry,
Simran Basi1bf60eb2015-12-01 16:39:29 -0800275 verbose=verbose, in_lab=True)
276 return command
showard87ba02a2009-04-20 19:37:32 +0000277
278
Simran Basia858a232012-08-21 11:04:37 -0700279class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800280
281
jadmanski0afbb632008-06-06 21:10:57 +0000282 def __init__(self):
283 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000284 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700285 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000286 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700287 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700288 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Dan Shic458f662015-04-29 12:12:38 -0700289 _db, _drone_manager)
showard170873e2009-01-07 00:22:26 +0000290 self._host_agents = {}
291 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000292 self._tick_count = 0
293 self._last_garbage_stats_time = time.time()
294 self._seconds_between_garbage_stats = 60 * (
295 global_config.global_config.get_config_value(
296 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700297 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700298 self._tick_debug = global_config.global_config.get_config_value(
299 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
300 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700301 self._extra_debugging = global_config.global_config.get_config_value(
302 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
303 default=False)
mbligh36768f02008-02-22 18:28:33 +0000304
Prashanth Bf66d51b2014-05-06 12:42:25 -0700305 # If _inline_host_acquisition is set the scheduler will acquire and
306 # release hosts against jobs inline, with the tick. Otherwise the
307 # scheduler will only focus on jobs that already have hosts, and
308 # will not explicitly unlease a host when a job finishes using it.
309 self._job_query_manager = query_managers.AFEJobQueryManager()
310 self._host_scheduler = (host_scheduler.BaseHostScheduler()
311 if _inline_host_acquisition else
312 host_scheduler.DummyHostScheduler())
313
mbligh36768f02008-02-22 18:28:33 +0000314
showard915958d2009-04-22 21:00:58 +0000315 def initialize(self, recover_hosts=True):
316 self._periodic_cleanup.initialize()
317 self._24hr_upkeep.initialize()
Dan Shi55d58992015-05-05 09:10:02 -0700318 # Execute all actions queued in the cleanup tasks. Scheduler tick will
319 # run a refresh task first. If there is any action in the queue, refresh
320 # will raise an exception.
321 _drone_manager.execute_actions()
showard915958d2009-04-22 21:00:58 +0000322
jadmanski0afbb632008-06-06 21:10:57 +0000323 # always recover processes
324 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000325
jadmanski0afbb632008-06-06 21:10:57 +0000326 if recover_hosts:
327 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000328
329
Simran Basi0ec94dd2012-08-28 09:50:10 -0700330 def _log_tick_msg(self, msg):
331 if self._tick_debug:
332 logging.debug(msg)
333
334
Simran Basidef92872012-09-20 13:34:34 -0700335 def _log_extra_msg(self, msg):
336 if self._extra_debugging:
337 logging.debug(msg)
338
339
jadmanski0afbb632008-06-06 21:10:57 +0000340 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700341 """
342 This is an altered version of tick() where we keep track of when each
343 major step begins so we can try to figure out where we are using most
344 of the tick time.
345 """
Gabe Black1e1c41b2015-02-04 23:55:15 -0800346 timer = autotest_stats.Timer('scheduler.tick')
Dan Shi114e1722016-01-10 18:12:53 -0800347 system_utils.DroneCache.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700348 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000349 self._garbage_collection()
Prashanth B340fd1e2014-06-22 12:44:10 -0700350 self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
351 _drone_manager.trigger_refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700352 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000353 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700354 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000355 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700356 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000357 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700358 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000359 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700360 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000361 self._schedule_new_jobs()
Prashanth B340fd1e2014-06-22 12:44:10 -0700362 self._log_tick_msg('Calling _drone_manager.sync_refresh().')
363 _drone_manager.sync_refresh()
Dan Shi55d58992015-05-05 09:10:02 -0700364 # _run_cleanup must be called between drone_manager.sync_refresh, and
365 # drone_manager.execute_actions, as sync_refresh will clear the calls
366 # queued in drones. Therefore, any action that calls drone.queue_call
367 # to add calls to the drone._calls, should be after drone refresh is
368 # completed and before drone_manager.execute_actions at the end of the
369 # tick.
370 self._log_tick_msg('Calling _run_cleanup().')
371 self._run_cleanup()
Prashanth B67548092014-07-11 18:46:01 -0700372 self._log_tick_msg('Calling _find_aborting().')
373 self._find_aborting()
374 self._log_tick_msg('Calling _find_aborted_special_tasks().')
375 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700376 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000377 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700378 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000379 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700380 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000381 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700382 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700383 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700384 with timer.get_client('email_manager_send_queued_emails'):
385 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700386 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700387 with timer.get_client('django_db_reset_queries'):
388 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000389 self._tick_count += 1
Aviv Keshet65fed072016-06-29 10:20:55 -0700390 metrics.Counter('chromeos/autotest/scheduler/tick').increment()
mbligh36768f02008-02-22 18:28:33 +0000391
showard97aed502008-11-04 02:01:24 +0000392
mblighf3294cc2009-04-08 21:17:38 +0000393 def _run_cleanup(self):
394 self._periodic_cleanup.run_cleanup_maybe()
395 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000396
mbligh36768f02008-02-22 18:28:33 +0000397
showardf13a9e22009-12-18 22:54:09 +0000398 def _garbage_collection(self):
399 threshold_time = time.time() - self._seconds_between_garbage_stats
400 if threshold_time < self._last_garbage_stats_time:
401 # Don't generate these reports very often.
402 return
403
404 self._last_garbage_stats_time = time.time()
405 # Force a full level 0 collection (because we can, it doesn't hurt
406 # at this interval).
407 gc.collect()
408 logging.info('Logging garbage collector stats on tick %d.',
409 self._tick_count)
410 gc_stats._log_garbage_collector_stats()
411
412
showard170873e2009-01-07 00:22:26 +0000413 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
414 for object_id in object_ids:
415 agent_dict.setdefault(object_id, set()).add(agent)
416
417
418 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
419 for object_id in object_ids:
420 assert object_id in agent_dict
421 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700422 # If an ID has no more active agent associated, there is no need to
423 # keep it in the dictionary. Otherwise, scheduler will keep an
424 # unnecessarily big dictionary until being restarted.
425 if not agent_dict[object_id]:
426 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000427
428
showardd1195652009-12-08 22:21:02 +0000429 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700430 """
431 Creates and adds an agent to the dispatchers list.
432
433 In creating the agent we also pass on all the queue_entry_ids and
434 host_ids from the special agent task. For every agent we create, we
435 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
436 against the host_ids given to it. So theoritically, a host can have any
437 number of agents associated with it, and each of them can have any
438 special agent task, though in practice we never see > 1 agent/task per
439 host at any time.
440
441 @param agent_task: A SpecialTask for the agent to manage.
442 """
showardd1195652009-12-08 22:21:02 +0000443 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000444 self._agents.append(agent)
445 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000446 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
447 self._register_agent_for_ids(self._queue_entry_agents,
448 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000449
showard170873e2009-01-07 00:22:26 +0000450
451 def get_agents_for_entry(self, queue_entry):
452 """
453 Find agents corresponding to the specified queue_entry.
454 """
showardd3dc1992009-04-22 21:01:40 +0000455 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000456
457
458 def host_has_agent(self, host):
459 """
460 Determine if there is currently an Agent present using this host.
461 """
462 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000463
464
jadmanski0afbb632008-06-06 21:10:57 +0000465 def remove_agent(self, agent):
466 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000467 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
468 agent)
469 self._unregister_agent_for_ids(self._queue_entry_agents,
470 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000471
472
showard8cc058f2009-09-08 16:26:33 +0000473 def _host_has_scheduled_special_task(self, host):
474 return bool(models.SpecialTask.objects.filter(host__id=host.id,
475 is_active=False,
476 is_complete=False))
477
478
jadmanski0afbb632008-06-06 21:10:57 +0000479 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000480 agent_tasks = self._create_recovery_agent_tasks()
481 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000482 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000483 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000484 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000485 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000486 self._reverify_remaining_hosts()
487 # reinitialize drones after killing orphaned processes, since they can
488 # leave around files when they die
489 _drone_manager.execute_actions()
490 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000491
showard170873e2009-01-07 00:22:26 +0000492
showardd1195652009-12-08 22:21:02 +0000493 def _create_recovery_agent_tasks(self):
494 return (self._get_queue_entry_agent_tasks()
495 + self._get_special_task_agent_tasks(is_active=True))
496
497
498 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700499 """
500 Get agent tasks for all hqe in the specified states.
501
502 Loosely this translates to taking a hqe in one of the specified states,
503 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
504 through _get_agent_task_for_queue_entry. Each queue entry can only have
505 one agent task at a time, but there might be multiple queue entries in
506 the group.
507
508 @return: A list of AgentTasks.
509 """
showardd1195652009-12-08 22:21:02 +0000510 # host queue entry statuses handled directly by AgentTasks (Verifying is
511 # handled through SpecialTasks, so is not listed here)
512 statuses = (models.HostQueueEntry.Status.STARTING,
513 models.HostQueueEntry.Status.RUNNING,
514 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000515 models.HostQueueEntry.Status.PARSING,
516 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000517 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000518 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000519 where='status IN (%s)' % status_list)
Gabe Black1e1c41b2015-02-04 23:55:15 -0800520 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Alex Miller47cd2472013-11-25 15:20:04 -0800521 'running', len(queue_entries))
showardd1195652009-12-08 22:21:02 +0000522
523 agent_tasks = []
524 used_queue_entries = set()
525 for entry in queue_entries:
526 if self.get_agents_for_entry(entry):
527 # already being handled
528 continue
529 if entry in used_queue_entries:
530 # already picked up by a synchronous job
531 continue
532 agent_task = self._get_agent_task_for_queue_entry(entry)
533 agent_tasks.append(agent_task)
534 used_queue_entries.update(agent_task.queue_entries)
535 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000536
537
showardd1195652009-12-08 22:21:02 +0000538 def _get_special_task_agent_tasks(self, is_active=False):
539 special_tasks = models.SpecialTask.objects.filter(
540 is_active=is_active, is_complete=False)
541 return [self._get_agent_task_for_special_task(task)
542 for task in special_tasks]
543
544
545 def _get_agent_task_for_queue_entry(self, queue_entry):
546 """
beeps8bb1f7d2013-08-05 01:30:09 -0700547 Construct an AgentTask instance for the given active HostQueueEntry.
548
showardd1195652009-12-08 22:21:02 +0000549 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700550 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000551 """
552 task_entries = queue_entry.job.get_group_entries(queue_entry)
553 self._check_for_duplicate_host_entries(task_entries)
554
555 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
556 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000557 if queue_entry.is_hostless():
558 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000559 return QueueTask(queue_entries=task_entries)
560 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700561 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000562 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700563 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000564 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700565 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000566
Prashanth B0e960282014-05-13 19:38:28 -0700567 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800568 '_get_agent_task_for_queue_entry got entry with '
569 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000570
571
572 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000573 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
574 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000575 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000576 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000577 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000578 if using_host:
showardd1195652009-12-08 22:21:02 +0000579 self._assert_host_has_no_agent(task_entry)
580
581
582 def _assert_host_has_no_agent(self, entry):
583 """
584 @param entry: a HostQueueEntry or a SpecialTask
585 """
586 if self.host_has_agent(entry.host):
587 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700588 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000589 'While scheduling %s, host %s already has a host agent %s'
590 % (entry, entry.host, agent.task))
591
592
593 def _get_agent_task_for_special_task(self, special_task):
594 """
595 Construct an AgentTask class to run the given SpecialTask and add it
596 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700597
MK Ryu35d661e2014-09-25 17:44:10 -0700598 A special task is created through schedule_special_tasks, but only if
beeps8bb1f7d2013-08-05 01:30:09 -0700599 the host doesn't already have an agent. This happens through
600 add_agent_task. All special agent tasks are given a host on creation,
601 and a Null hqe. To create a SpecialAgentTask object, you need a
602 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
603 object contains a hqe it's passed on to the special agent task, which
604 creates a HostQueueEntry and saves it as it's queue_entry.
605
showardd1195652009-12-08 22:21:02 +0000606 @param special_task: a models.SpecialTask instance
607 @returns an AgentTask to run this SpecialTask
608 """
609 self._assert_host_has_no_agent(special_task)
610
beeps5e2bb4a2013-10-28 11:26:45 -0700611 special_agent_task_classes = (prejob_task.CleanupTask,
612 prejob_task.VerifyTask,
613 prejob_task.RepairTask,
614 prejob_task.ResetTask,
615 prejob_task.ProvisionTask)
616
showardd1195652009-12-08 22:21:02 +0000617 for agent_task_class in special_agent_task_classes:
618 if agent_task_class.TASK_TYPE == special_task.task:
619 return agent_task_class(task=special_task)
620
Prashanth B0e960282014-05-13 19:38:28 -0700621 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800622 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000623
624
625 def _register_pidfiles(self, agent_tasks):
626 for agent_task in agent_tasks:
627 agent_task.register_necessary_pidfiles()
628
629
630 def _recover_tasks(self, agent_tasks):
631 orphans = _drone_manager.get_orphaned_autoserv_processes()
632
633 for agent_task in agent_tasks:
634 agent_task.recover()
635 if agent_task.monitor and agent_task.monitor.has_process():
636 orphans.discard(agent_task.monitor.get_process())
637 self.add_agent_task(agent_task)
638
639 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000640
641
showard8cc058f2009-09-08 16:26:33 +0000642 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000643 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
644 % status):
showard0db3d432009-10-12 20:29:15 +0000645 if entry.status == status and not self.get_agents_for_entry(entry):
646 # The status can change during iteration, e.g., if job.run()
647 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000648 yield entry
649
650
showard6878e8b2009-07-20 22:37:45 +0000651 def _check_for_remaining_orphan_processes(self, orphans):
652 if not orphans:
653 return
654 subject = 'Unrecovered orphan autoserv processes remain'
655 message = '\n'.join(str(process) for process in orphans)
656 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000657
658 die_on_orphans = global_config.global_config.get_config_value(
659 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
660
661 if die_on_orphans:
662 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000663
showard170873e2009-01-07 00:22:26 +0000664
showard8cc058f2009-09-08 16:26:33 +0000665 def _recover_pending_entries(self):
666 for entry in self._get_unassigned_entries(
667 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000668 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000669 entry.on_pending()
670
671
showardb8900452009-10-12 20:31:01 +0000672 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000673 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000674 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
675 unrecovered_hqes = []
676 for queue_entry in queue_entries:
677 special_tasks = models.SpecialTask.objects.filter(
678 task__in=(models.SpecialTask.Task.CLEANUP,
679 models.SpecialTask.Task.VERIFY),
680 queue_entry__id=queue_entry.id,
681 is_complete=False)
682 if special_tasks.count() == 0:
683 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000684
showardb8900452009-10-12 20:31:01 +0000685 if unrecovered_hqes:
686 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700687 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000688 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000689 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000690
691
showard65db3932009-10-28 19:54:35 +0000692 def _schedule_special_tasks(self):
693 """
694 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700695
696 Special tasks include PreJobTasks like verify, reset and cleanup.
697 They are created through _schedule_new_jobs and associated with a hqe
698 This method translates SpecialTasks to the appropriate AgentTask and
699 adds them to the dispatchers agents list, so _handle_agents can execute
700 them.
showard65db3932009-10-28 19:54:35 +0000701 """
Prashanth B4ec98672014-05-15 10:44:54 -0700702 # When the host scheduler is responsible for acquisition we only want
703 # to run tasks with leased hosts. All hqe tasks will already have
704 # leased hosts, and we don't want to run frontend tasks till the host
705 # scheduler has vetted the assignment. Note that this doesn't include
706 # frontend tasks with hosts leased by other active hqes.
707 for task in self._job_query_manager.get_prioritized_special_tasks(
708 only_tasks_with_leased_hosts=not _inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000709 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000710 continue
showardd1195652009-12-08 22:21:02 +0000711 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000712
713
showard170873e2009-01-07 00:22:26 +0000714 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000715 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000716 # should never happen
showarded2afea2009-07-07 20:54:07 +0000717 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000718 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000719 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700720 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000721 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000722
723
jadmanski0afbb632008-06-06 21:10:57 +0000724 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000725 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700726 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000727 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000728 if self.host_has_agent(host):
729 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000730 continue
showard8cc058f2009-09-08 16:26:33 +0000731 if self._host_has_scheduled_special_task(host):
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700732 # host will have a special task scheduled on the next tick
showard8cc058f2009-09-08 16:26:33 +0000733 continue
showard170873e2009-01-07 00:22:26 +0000734 if print_message:
showardb18134f2009-03-20 20:52:18 +0000735 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000736 models.SpecialTask.objects.create(
737 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000738 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000739
740
jadmanski0afbb632008-06-06 21:10:57 +0000741 def _recover_hosts(self):
742 # recover "Repair Failed" hosts
743 message = 'Reverifying dead host %s'
744 self._reverify_hosts_where("status = 'Repair Failed'",
745 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000746
747
showard89f84db2009-03-12 20:39:13 +0000748 def _refresh_pending_queue_entries(self):
749 """
750 Lookup the pending HostQueueEntries and call our HostScheduler
751 refresh() method given that list. Return the list.
752
753 @returns A list of pending HostQueueEntries sorted in priority order.
754 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700755 queue_entries = self._job_query_manager.get_pending_queue_entries(
756 only_hostless=not _inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000757 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000758 return []
showard89f84db2009-03-12 20:39:13 +0000759 return queue_entries
760
761
showarda9545c02009-12-18 22:44:26 +0000762 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800763 """Schedule a hostless (suite) job.
764
765 @param queue_entry: The queue_entry representing the hostless job.
766 """
showarda9545c02009-12-18 22:44:26 +0000767 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700768
769 # Need to set execution_subdir before setting the status:
770 # After a restart of the scheduler, agents will be restored for HQEs in
771 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
772 # execution_subdir is needed. Therefore it must be set before entering
773 # one of these states.
774 # Otherwise, if the scheduler was interrupted between setting the status
775 # and the execution_subdir, upon it's restart restoring agents would
776 # fail.
777 # Is there a way to get a status in one of these states without going
778 # through this code? Following cases are possible:
779 # - If it's aborted before being started:
780 # active bit will be 0, so there's nothing to parse, it will just be
781 # set to completed by _find_aborting. Critical statuses are skipped.
782 # - If it's aborted or it fails after being started:
783 # It was started, so this code was executed.
784 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000785 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000786
787
beepscc9fc702013-12-02 12:45:38 -0800788 def _schedule_host_job(self, host, queue_entry):
789 """Schedules a job on the given host.
790
791 1. Assign the host to the hqe, if it isn't already assigned.
792 2. Create a SpecialAgentTask for the hqe.
793 3. Activate the hqe.
794
795 @param queue_entry: The job to schedule.
796 @param host: The host to schedule the job on.
797 """
798 if self.host_has_agent(host):
799 host_agent_task = list(self._host_agents.get(host.id))[0].task
800 subject = 'Host with agents assigned to an HQE'
801 message = ('HQE: %s assigned host %s, but the host has '
802 'agent: %s for queue_entry %s. The HQE '
beepsf7d35162014-02-13 16:42:13 -0800803 'will have to try and acquire a host next tick ' %
beepscc9fc702013-12-02 12:45:38 -0800804 (queue_entry, host.hostname, host_agent_task,
805 host_agent_task.queue_entry))
806 email_manager.manager.enqueue_notify_email(subject, message)
beepscc9fc702013-12-02 12:45:38 -0800807 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700808 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800809
810
showard89f84db2009-03-12 20:39:13 +0000811 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700812 """
813 Find any new HQEs and call schedule_pre_job_tasks for it.
814
815 This involves setting the status of the HQE and creating a row in the
816 db corresponding the the special task, through
817 scheduler_models._queue_special_task. The new db row is then added as
818 an agent to the dispatcher through _schedule_special_tasks and
819 scheduled for execution on the drone through _handle_agents.
820 """
showard89f84db2009-03-12 20:39:13 +0000821 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000822
beepscc9fc702013-12-02 12:45:38 -0800823 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700824 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700825 new_jobs_with_hosts = 0
826 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800827 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700828 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000829
beepscc9fc702013-12-02 12:45:38 -0800830 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000831 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000832 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700833 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000834 else:
beepscc9fc702013-12-02 12:45:38 -0800835 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700836 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700837
Gabe Black1e1c41b2015-02-04 23:55:15 -0800838 autotest_stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800839 if not host_jobs:
840 return
Prashanth Bf66d51b2014-05-06 12:42:25 -0700841 if not _inline_host_acquisition:
842 message = ('Found %s jobs that need hosts though '
843 '_inline_host_acquisition=%s. Will acquire hosts.' %
844 ([str(job) for job in host_jobs],
845 _inline_host_acquisition))
846 email_manager.manager.enqueue_notify_email(
847 'Processing unexpected host acquisition requests', message)
848 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
849 for host_assignment in jobs_with_hosts:
850 self._schedule_host_job(host_assignment.host, host_assignment.job)
851 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800852
Gabe Black1e1c41b2015-02-04 23:55:15 -0800853 autotest_stats.Gauge(key).send('new_jobs_with_hosts',
854 new_jobs_with_hosts)
855 autotest_stats.Gauge(key).send('new_jobs_without_hosts',
856 new_jobs_need_hosts -
857 new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000858
859
showard8cc058f2009-09-08 16:26:33 +0000860 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700861 """
862 Adds agents to the dispatcher.
863
864 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
865 QueueTask for example, will have a job with a control file, and
866 the agent will have methods that poll, abort and check if the queue
867 task is finished. The dispatcher runs the agent_task, as well as
868 other agents in it's _agents member, through _handle_agents, by
869 calling the Agents tick().
870
871 This method creates an agent for each HQE in one of (starting, running,
872 gathering, parsing, archiving) states, and adds it to the dispatcher so
873 it is handled by _handle_agents.
874 """
showardd1195652009-12-08 22:21:02 +0000875 for agent_task in self._get_queue_entry_agent_tasks():
876 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000877
878
879 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000880 for entry in scheduler_models.HostQueueEntry.fetch(
881 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000882 task = entry.job.schedule_delayed_callback_task(entry)
883 if task:
showardd1195652009-12-08 22:21:02 +0000884 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000885
886
jadmanski0afbb632008-06-06 21:10:57 +0000887 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700888 """
889 Looks through the afe_host_queue_entries for an aborted entry.
890
891 The aborted bit is set on an HQE in many ways, the most common
892 being when a user requests an abort through the frontend, which
893 results in an rpc from the afe to abort_host_queue_entries.
894 """
jamesrene7c65cb2010-06-08 20:38:10 +0000895 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000896 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700897 where='aborted=1 and complete=0'):
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800898
899 # If the job is running on a shard, let the shard handle aborting
900 # it and sync back the right status.
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800901 if entry.job.shard_id is not None and not server_utils.is_shard():
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800902 logging.info('Waiting for shard %s to abort hqe %s',
903 entry.job.shard_id, entry)
904 continue
905
showardf4a2e502009-07-28 20:06:39 +0000906 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800907
908 # The task would have started off with both is_complete and
909 # is_active = False. Aborted tasks are neither active nor complete.
910 # For all currently active tasks this will happen through the agent,
911 # but we need to manually update the special tasks that haven't
912 # started yet, because they don't have agents.
913 models.SpecialTask.objects.filter(is_active=False,
914 queue_entry_id=entry.id).update(is_complete=True)
915
showardd3dc1992009-04-22 21:01:40 +0000916 for agent in self.get_agents_for_entry(entry):
917 agent.abort()
918 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000919 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700920 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000921 for job in jobs_to_stop:
922 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000923
924
beeps8bb1f7d2013-08-05 01:30:09 -0700925 def _find_aborted_special_tasks(self):
926 """
927 Find SpecialTasks that have been marked for abortion.
928
929 Poll the database looking for SpecialTasks that are active
930 and have been marked for abortion, then abort them.
931 """
932
933 # The completed and active bits are very important when it comes
934 # to scheduler correctness. The active bit is set through the prolog
935 # of a special task, and reset through the cleanup method of the
936 # SpecialAgentTask. The cleanup is called both through the abort and
937 # epilog. The complete bit is set in several places, and in general
938 # a hanging job will have is_active=1 is_complete=0, while a special
939 # task which completed will have is_active=0 is_complete=1. To check
940 # aborts we directly check active because the complete bit is set in
941 # several places, including the epilog of agent tasks.
942 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
943 is_aborted=True)
944 for task in aborted_tasks:
945 # There are 2 ways to get the agent associated with a task,
946 # through the host and through the hqe. A special task
947 # always needs a host, but doesn't always need a hqe.
948 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700949 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000950
beeps8bb1f7d2013-08-05 01:30:09 -0700951 # The epilog preforms critical actions such as
952 # queueing the next SpecialTask, requeuing the
953 # hqe etc, however it doesn't actually kill the
954 # monitor process and set the 'done' bit. Epilogs
955 # assume that the job failed, and that the monitor
956 # process has already written an exit code. The
957 # done bit is a necessary condition for
958 # _handle_agents to schedule any more special
959 # tasks against the host, and it must be set
960 # in addition to is_active, is_complete and success.
961 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000962 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700963
964
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700965 def _can_start_agent(self, agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000966 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000967 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000968 return True
969 # don't allow any nonzero-process agents to run after we've reached a
970 # limit (this avoids starvation of many-process agents)
971 if have_reached_limit:
972 return False
973 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000974 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000975 agent.task.owner_username,
976 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000977 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000978 return False
showard4c5374f2008-09-04 17:02:56 +0000979 return True
980
981
jadmanski0afbb632008-06-06 21:10:57 +0000982 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700983 """
984 Handles agents of the dispatcher.
985
986 Appropriate Agents are added to the dispatcher through
987 _schedule_running_host_queue_entries. These agents each
988 have a task. This method runs the agents task through
989 agent.tick() leading to:
990 agent.start
991 prolog -> AgentTasks prolog
992 For each queue entry:
993 sets host status/status to Running
994 set started_on in afe_host_queue_entries
995 run -> AgentTasks run
996 Creates PidfileRunMonitor
997 Queues the autoserv command line for this AgentTask
998 via the drone manager. These commands are executed
999 through the drone managers execute actions.
1000 poll -> AgentTasks/BaseAgentTask poll
1001 checks the monitors exit_code.
1002 Executes epilog if task is finished.
1003 Executes AgentTasks _finish_task
1004 finish_task is usually responsible for setting the status
1005 of the HQE/host, and updating it's active and complete fileds.
1006
1007 agent.is_done
1008 Removed the agent from the dispatchers _agents queue.
1009 Is_done checks the finished bit on the agent, that is
1010 set based on the Agents task. During the agents poll
1011 we check to see if the monitor process has exited in
1012 it's finish method, and set the success member of the
1013 task based on this exit code.
1014 """
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001015 num_started_this_tick = 0
1016 num_finished_this_tick = 0
showard4c5374f2008-09-04 17:02:56 +00001017 have_reached_limit = False
1018 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -07001019 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +00001020 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001021 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1022 'queue_entry ids:%s' % (agent.host_ids,
1023 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001024 if not agent.started:
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001025 if not self._can_start_agent(agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001026 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001027 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001028 continue
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001029 num_started_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001030 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001031 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001032 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001033 if agent.is_done():
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001034 num_finished_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001035 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001036 self.remove_agent(agent)
Gabe Black1e1c41b2015-02-04 23:55:15 -08001037 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001038 'agents_started', num_started_this_tick)
Gabe Black1e1c41b2015-02-04 23:55:15 -08001039 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001040 'agents_finished', num_finished_this_tick)
1041 logging.info('%d running processes. %d added this tick.',
Simran Basi3f6717d2012-09-13 15:21:22 -07001042 _drone_manager.total_running_processes(),
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001043 num_started_this_tick)
mbligh36768f02008-02-22 18:28:33 +00001044
1045
showard29f7cd22009-04-29 21:16:24 +00001046 def _process_recurring_runs(self):
1047 recurring_runs = models.RecurringRun.objects.filter(
1048 start_date__lte=datetime.datetime.now())
1049 for rrun in recurring_runs:
1050 # Create job from template
1051 job = rrun.job
1052 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001053 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001054
1055 host_objects = info['hosts']
1056 one_time_hosts = info['one_time_hosts']
1057 metahost_objects = info['meta_hosts']
1058 dependencies = info['dependencies']
1059 atomic_group = info['atomic_group']
1060
1061 for host in one_time_hosts or []:
1062 this_host = models.Host.create_one_time_host(host.hostname)
1063 host_objects.append(this_host)
1064
1065 try:
1066 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001067 options=options,
showard29f7cd22009-04-29 21:16:24 +00001068 host_objects=host_objects,
1069 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001070 atomic_group=atomic_group)
1071
1072 except Exception, ex:
1073 logging.exception(ex)
1074 #TODO send email
1075
1076 if rrun.loop_count == 1:
1077 rrun.delete()
1078 else:
1079 if rrun.loop_count != 0: # if not infinite loop
1080 # calculate new start_date
1081 difference = datetime.timedelta(seconds=rrun.loop_period)
1082 rrun.start_date = rrun.start_date + difference
1083 rrun.loop_count -= 1
1084 rrun.save()
1085
1086
Simran Basia858a232012-08-21 11:04:37 -07001087SiteDispatcher = utils.import_site_class(
1088 __file__, 'autotest_lib.scheduler.site_monitor_db',
1089 'SiteDispatcher', BaseDispatcher)
1090
1091class Dispatcher(SiteDispatcher):
1092 pass
1093
1094
mbligh36768f02008-02-22 18:28:33 +00001095class Agent(object):
showard77182562009-06-10 00:16:05 +00001096 """
Alex Miller47715eb2013-07-24 03:34:01 -07001097 An agent for use by the Dispatcher class to perform a task. An agent wraps
1098 around an AgentTask mainly to associate the AgentTask with the queue_entry
1099 and host ids.
showard77182562009-06-10 00:16:05 +00001100
1101 The following methods are required on all task objects:
1102 poll() - Called periodically to let the task check its status and
1103 update its internal state. If the task succeeded.
1104 is_done() - Returns True if the task is finished.
1105 abort() - Called when an abort has been requested. The task must
1106 set its aborted attribute to True if it actually aborted.
1107
1108 The following attributes are required on all task objects:
1109 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001110 success - bool, True if this task succeeded.
1111 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1112 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001113 """
1114
1115
showard418785b2009-11-23 20:19:59 +00001116 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001117 """
Alex Miller47715eb2013-07-24 03:34:01 -07001118 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001119 """
showard8cc058f2009-09-08 16:26:33 +00001120 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001121
showard77182562009-06-10 00:16:05 +00001122 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001123 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001124
showard8cc058f2009-09-08 16:26:33 +00001125 self.queue_entry_ids = task.queue_entry_ids
1126 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001127
showard8cc058f2009-09-08 16:26:33 +00001128 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001129 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001130
1131
jadmanski0afbb632008-06-06 21:10:57 +00001132 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001133 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001134 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001135 self.task.poll()
1136 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001137 self.finished = True
showardec113162008-05-08 00:52:49 +00001138
1139
jadmanski0afbb632008-06-06 21:10:57 +00001140 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001141 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001142
1143
showardd3dc1992009-04-22 21:01:40 +00001144 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001145 if self.task:
1146 self.task.abort()
1147 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001148 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001149 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001150
showardd3dc1992009-04-22 21:01:40 +00001151
beeps5e2bb4a2013-10-28 11:26:45 -07001152class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001153 """
1154 Common functionality for QueueTask and HostlessQueueTask
1155 """
1156 def __init__(self, queue_entries):
1157 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001158 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001159 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001160
1161
showard73ec0442009-02-07 02:05:20 +00001162 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001163 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001164
1165
jamesrenc44ae992010-02-19 00:12:54 +00001166 def _write_control_file(self, execution_path):
1167 control_path = _drone_manager.attach_file_to_execution(
1168 execution_path, self.job.control_file)
1169 return control_path
1170
1171
Aviv Keshet308e7362013-05-21 14:43:16 -07001172 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001173 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001174 execution_path = self.queue_entries[0].execution_path()
1175 control_path = self._write_control_file(execution_path)
1176 hostnames = ','.join(entry.host.hostname
1177 for entry in self.queue_entries
1178 if not entry.is_hostless())
1179
1180 execution_tag = self.queue_entries[0].execution_tag()
1181 params = _autoserv_command_line(
1182 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001183 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001184 _drone_manager.absolute_path(control_path)],
1185 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001186 if self.job.is_image_update_job():
1187 params += ['--image', self.job.update_image_path]
1188
jamesrenc44ae992010-02-19 00:12:54 +00001189 return params
showardd1195652009-12-08 22:21:02 +00001190
1191
1192 @property
1193 def num_processes(self):
1194 return len(self.queue_entries)
1195
1196
1197 @property
1198 def owner_username(self):
1199 return self.job.owner
1200
1201
1202 def _working_directory(self):
1203 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001204
1205
jadmanski0afbb632008-06-06 21:10:57 +00001206 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001207 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001208 keyval_dict = self.job.keyval_dict()
1209 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001210 group_name = self.queue_entries[0].get_group_name()
1211 if group_name:
1212 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001213 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001214 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001215 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001216 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001217
1218
showard35162b02009-03-03 02:17:30 +00001219 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001220 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001221 _drone_manager.write_lines_to_file(error_file_path,
1222 [_LOST_PROCESS_ERROR])
1223
1224
showardd3dc1992009-04-22 21:01:40 +00001225 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001226 if not self.monitor:
1227 return
1228
showardd9205182009-04-27 20:09:55 +00001229 self._write_job_finished()
1230
showard35162b02009-03-03 02:17:30 +00001231 if self.monitor.lost_process:
1232 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001233
jadmanskif7fa2cc2008-10-01 14:13:23 +00001234
showardcbd74612008-11-19 21:42:02 +00001235 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001236 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001237 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001238 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001239 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001240
1241
jadmanskif7fa2cc2008-10-01 14:13:23 +00001242 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001243 if not self.monitor or not self.monitor.has_process():
1244 return
1245
jadmanskif7fa2cc2008-10-01 14:13:23 +00001246 # build up sets of all the aborted_by and aborted_on values
1247 aborted_by, aborted_on = set(), set()
1248 for queue_entry in self.queue_entries:
1249 if queue_entry.aborted_by:
1250 aborted_by.add(queue_entry.aborted_by)
1251 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1252 aborted_on.add(t)
1253
1254 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001255 # TODO(showard): this conditional is now obsolete, we just need to leave
1256 # it in temporarily for backwards compatibility over upgrades. delete
1257 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001258 assert len(aborted_by) <= 1
1259 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001260 aborted_by_value = aborted_by.pop()
1261 aborted_on_value = max(aborted_on)
1262 else:
1263 aborted_by_value = 'autotest_system'
1264 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001265
showarda0382352009-02-11 23:36:43 +00001266 self._write_keyval_after_job("aborted_by", aborted_by_value)
1267 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001268
showardcbd74612008-11-19 21:42:02 +00001269 aborted_on_string = str(datetime.datetime.fromtimestamp(
1270 aborted_on_value))
1271 self._write_status_comment('Job aborted by %s on %s' %
1272 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001273
1274
jadmanski0afbb632008-06-06 21:10:57 +00001275 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001276 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001277 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001278 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001279
1280
jadmanski0afbb632008-06-06 21:10:57 +00001281 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001282 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001283 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001284
1285
1286class QueueTask(AbstractQueueTask):
1287 def __init__(self, queue_entries):
1288 super(QueueTask, self).__init__(queue_entries)
1289 self._set_ids(queue_entries=queue_entries)
1290
1291
1292 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001293 self._check_queue_entry_statuses(
1294 self.queue_entries,
1295 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1296 models.HostQueueEntry.Status.RUNNING),
1297 allowed_host_statuses=(models.Host.Status.PENDING,
1298 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001299
1300 super(QueueTask, self).prolog()
1301
1302 for queue_entry in self.queue_entries:
1303 self._write_host_keyvals(queue_entry.host)
1304 queue_entry.host.set_status(models.Host.Status.RUNNING)
1305 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001306
1307
1308 def _finish_task(self):
1309 super(QueueTask, self)._finish_task()
1310
1311 for queue_entry in self.queue_entries:
1312 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001313 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001314
1315
Alex Miller9f01d5d2013-08-08 02:26:01 -07001316 def _command_line(self):
Dan Shi36cfd832014-10-10 13:38:51 -07001317 invocation = super(QueueTask, self)._command_line()
1318 # Check if server-side packaging is needed.
1319 if (_enable_ssp_container and
1320 self.job.control_type == control_data.CONTROL_TYPE.SERVER and
1321 self.job.require_ssp != False):
Dan Shiec1d47d2015-02-13 11:38:13 -08001322 invocation += ['--require-ssp']
Dan Shi36cfd832014-10-10 13:38:51 -07001323 keyval_dict = self.job.keyval_dict()
1324 test_source_build = keyval_dict.get('test_source_build', None)
1325 if test_source_build:
1326 invocation += ['--test_source_build', test_source_build]
Dan Shi70647ca2015-07-16 22:52:35 -07001327 if self.job.parent_job_id:
1328 invocation += ['--parent_job_id', str(self.job.parent_job_id)]
Dan Shi36cfd832014-10-10 13:38:51 -07001329 return invocation + ['--verify_job_repo_url']
Alex Miller9f01d5d2013-08-08 02:26:01 -07001330
1331
Dan Shi1a189052013-10-28 14:41:35 -07001332class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001333 def __init__(self, queue_entry):
1334 super(HostlessQueueTask, self).__init__([queue_entry])
1335 self.queue_entry_ids = [queue_entry.id]
1336
1337
1338 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001339 super(HostlessQueueTask, self).prolog()
1340
1341
mbligh4608b002010-01-05 18:22:35 +00001342 def _finish_task(self):
1343 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001344
1345 # When a job is added to database, its initial status is always
1346 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1347 # status, check if any of them can be started. If scheduler hits some
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001348 # limit, e.g., max_hostless_jobs_per_drone, scheduler will
1349 # leave these jobs in Starting status. Otherwise, the jobs'
1350 # status will be changed to Running, and an autoserv process
1351 # will be started in drone for each of these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001352 # If the entry is still in status Starting, the process has not started
1353 # yet. Therefore, there is no need to parse and collect log. Without
1354 # this check, exception will be raised by scheduler as execution_subdir
1355 # for this queue entry does not have a value yet.
1356 hqe = self.queue_entries[0]
1357 if hqe.status != models.HostQueueEntry.Status.STARTING:
1358 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001359
1360
mbligh36768f02008-02-22 18:28:33 +00001361if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001362 main()