blob: a0dff2be1062ffad2c85000b449535a4a27ac4f5 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
Aviv Keshet225bdfe2013-03-05 10:10:08 -08008import datetime, optparse, os, signal
beeps5e2bb4a2013-10-28 11:26:45 -07009import sys, time
Aviv Keshet225bdfe2013-03-05 10:10:08 -080010import logging, gc
showard402934a2009-12-21 22:20:47 +000011
Alex Miller05d7b4c2013-03-04 07:49:38 -080012import common
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000014
15import django.db
16
showard136e6dc2009-06-10 19:38:49 +000017from autotest_lib.client.common_lib import global_config, logging_manager
beeps5e2bb4a2013-10-28 11:26:45 -070018from autotest_lib.client.common_lib import utils
showardb1e51872008-10-07 11:08:18 +000019from autotest_lib.database import database_connection
Alex Miller05d7b4c2013-03-04 07:49:38 -080020from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
beeps5e2bb4a2013-10-28 11:26:45 -070021from autotest_lib.scheduler import agent_task, drone_manager, drones
22from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
23from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
24from autotest_lib.scheduler import postjob_task, scheduler_logging_config
jamesrenc44ae992010-02-19 00:12:54 +000025from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080026from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070027from autotest_lib.server import autoserv_utils
Fang Deng1d6c2a02013-04-17 15:25:45 -070028from autotest_lib.site_utils.graphite import stats
Alex Miller05d7b4c2013-03-04 07:49:38 -080029
showard549afad2009-08-20 23:33:36 +000030BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
31PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000032
mbligh36768f02008-02-22 18:28:33 +000033RESULTS_DIR = '.'
showard170873e2009-01-07 00:22:26 +000034DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000035AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
36
37if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000038 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000039AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
40AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
41
42if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000043 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000044
showard35162b02009-03-03 02:17:30 +000045# error message to leave in results dir when an autoserv process disappears
46# mysteriously
47_LOST_PROCESS_ERROR = """\
48Autoserv failed abnormally during execution for this job, probably due to a
49system error on the Autotest server. Full results may not be available. Sorry.
50"""
51
mbligh6f8bab42008-02-29 22:45:14 +000052_db = None
mbligh36768f02008-02-22 18:28:33 +000053_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070054
55# These 2 globals are replaced for testing
56_autoserv_directory = autoserv_utils.autoserv_directory
57_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000059_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000060
Eric Lie0493a42010-11-15 13:05:43 -080061def _parser_path_default(install_dir):
62 return os.path.join(install_dir, 'tko', 'parse')
63_parser_path_func = utils.import_site_function(
64 __file__, 'autotest_lib.scheduler.site_monitor_db',
65 'parser_path', _parser_path_default)
66_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
67
mbligh36768f02008-02-22 18:28:33 +000068
mbligh83c1e9e2009-05-01 23:10:41 +000069def _site_init_monitor_db_dummy():
70 return {}
71
72
jamesren76fcf192010-04-21 20:39:50 +000073def _verify_default_drone_set_exists():
74 if (models.DroneSet.drone_sets_enabled() and
75 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080076 raise host_scheduler.SchedulerError(
77 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000078
79
80def _sanity_check():
81 """Make sure the configs are consistent before starting the scheduler"""
82 _verify_default_drone_set_exists()
83
84
mbligh36768f02008-02-22 18:28:33 +000085def main():
showard27f33872009-04-07 18:20:53 +000086 try:
showard549afad2009-08-20 23:33:36 +000087 try:
88 main_without_exception_handling()
89 except SystemExit:
90 raise
91 except:
92 logging.exception('Exception escaping in monitor_db')
93 raise
94 finally:
95 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000096
97
98def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000099 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000100
showard136e6dc2009-06-10 19:38:49 +0000101 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000102 parser = optparse.OptionParser(usage)
103 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
104 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000105 parser.add_option('--test', help='Indicate that scheduler is under ' +
106 'test and should use dummy autoserv and no parsing',
107 action='store_true')
108 (options, args) = parser.parse_args()
109 if len(args) != 1:
110 parser.print_usage()
111 return
mbligh36768f02008-02-22 18:28:33 +0000112
showard5613c662009-06-08 23:30:33 +0000113 scheduler_enabled = global_config.global_config.get_config_value(
114 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
115
116 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800117 logging.error("Scheduler not enabled, set enable_scheduler to true in "
118 "the global_config's SCHEDULER section to enable it. "
119 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000120 sys.exit(1)
121
jadmanski0afbb632008-06-06 21:10:57 +0000122 global RESULTS_DIR
123 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000124
mbligh83c1e9e2009-05-01 23:10:41 +0000125 site_init = utils.import_site_function(__file__,
126 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
127 _site_init_monitor_db_dummy)
128 site_init()
129
showardcca334f2009-03-12 20:38:34 +0000130 # Change the cwd while running to avoid issues incase we were launched from
131 # somewhere odd (such as a random NFS home directory of the person running
132 # sudo to launch us as the appropriate user).
133 os.chdir(RESULTS_DIR)
134
jamesrenc7d387e2010-08-10 21:48:30 +0000135 # This is helpful for debugging why stuff a scheduler launches is
136 # misbehaving.
137 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000138
jadmanski0afbb632008-06-06 21:10:57 +0000139 if options.test:
140 global _autoserv_path
141 _autoserv_path = 'autoserv_dummy'
142 global _testing_mode
143 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000144
jamesrenc44ae992010-02-19 00:12:54 +0000145 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000146 server.start()
147
jadmanski0afbb632008-06-06 21:10:57 +0000148 try:
jamesrenc44ae992010-02-19 00:12:54 +0000149 initialize()
showardc5afc462009-01-13 00:09:39 +0000150 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000151 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000152
Eric Lia82dc352011-02-23 13:15:52 -0800153 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000154 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000155 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000156 except:
showard170873e2009-01-07 00:22:26 +0000157 email_manager.manager.log_stacktrace(
158 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000159
showard170873e2009-01-07 00:22:26 +0000160 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000161 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000162 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000163 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000164
165
showard136e6dc2009-06-10 19:38:49 +0000166def setup_logging():
167 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
168 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
169 logging_manager.configure_logging(
170 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
171 logfile_name=log_name)
172
173
mbligh36768f02008-02-22 18:28:33 +0000174def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000175 global _shutdown
176 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000177 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000178
179
jamesrenc44ae992010-02-19 00:12:54 +0000180def initialize():
showardb18134f2009-03-20 20:52:18 +0000181 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
182 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000183
showard8de37132009-08-31 18:33:08 +0000184 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000185 logging.critical("monitor_db already running, aborting!")
186 sys.exit(1)
187 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000188
showardb1e51872008-10-07 11:08:18 +0000189 if _testing_mode:
190 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000191 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000192
jadmanski0afbb632008-06-06 21:10:57 +0000193 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
194 global _db
showard170873e2009-01-07 00:22:26 +0000195 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000196 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000197
showardfa8629c2008-11-04 16:51:23 +0000198 # ensure Django connection is in autocommit
199 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000200 # bypass the readonly connection
201 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000202
showardb18134f2009-03-20 20:52:18 +0000203 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000204 signal.signal(signal.SIGINT, handle_sigint)
205
jamesrenc44ae992010-02-19 00:12:54 +0000206 initialize_globals()
207 scheduler_models.initialize()
208
showardd1ee1dd2009-01-07 21:33:08 +0000209 drones = global_config.global_config.get_config_value(
210 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
211 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000212 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000213 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000214 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
215
showardb18134f2009-03-20 20:52:18 +0000216 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000217
218
jamesrenc44ae992010-02-19 00:12:54 +0000219def initialize_globals():
220 global _drone_manager
221 _drone_manager = drone_manager.instance()
222
223
showarded2afea2009-07-07 20:54:07 +0000224def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
225 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000226 """
227 @returns The autoserv command line as a list of executable + parameters.
228
229 @param machines - string - A machine or comma separated list of machines
230 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000231 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700232 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
233 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000234 @param queue_entry - A HostQueueEntry object - If supplied and no Job
235 object was supplied, this will be used to lookup the Job object.
236 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700237 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
238 machines, results_directory=drone_manager.WORKING_DIRECTORY,
239 extra_args=extra_args, job=job, queue_entry=queue_entry,
240 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000241
242
Simran Basia858a232012-08-21 11:04:37 -0700243class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800244
245
jadmanski0afbb632008-06-06 21:10:57 +0000246 def __init__(self):
247 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000248 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800249 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000250 user_cleanup_time = scheduler_config.config.clean_interval
251 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
252 _db, user_cleanup_time)
253 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000254 self._host_agents = {}
255 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000256 self._tick_count = 0
257 self._last_garbage_stats_time = time.time()
258 self._seconds_between_garbage_stats = 60 * (
259 global_config.global_config.get_config_value(
260 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700261 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700262 self._tick_debug = global_config.global_config.get_config_value(
263 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
264 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700265 self._extra_debugging = global_config.global_config.get_config_value(
266 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
267 default=False)
mbligh36768f02008-02-22 18:28:33 +0000268
mbligh36768f02008-02-22 18:28:33 +0000269
showard915958d2009-04-22 21:00:58 +0000270 def initialize(self, recover_hosts=True):
271 self._periodic_cleanup.initialize()
272 self._24hr_upkeep.initialize()
273
jadmanski0afbb632008-06-06 21:10:57 +0000274 # always recover processes
275 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000276
jadmanski0afbb632008-06-06 21:10:57 +0000277 if recover_hosts:
278 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000279
jamesrenc44ae992010-02-19 00:12:54 +0000280 self._host_scheduler.recovery_on_startup()
281
mbligh36768f02008-02-22 18:28:33 +0000282
Simran Basi0ec94dd2012-08-28 09:50:10 -0700283 def _log_tick_msg(self, msg):
284 if self._tick_debug:
285 logging.debug(msg)
286
287
Simran Basidef92872012-09-20 13:34:34 -0700288 def _log_extra_msg(self, msg):
289 if self._extra_debugging:
290 logging.debug(msg)
291
292
jadmanski0afbb632008-06-06 21:10:57 +0000293 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700294 """
295 This is an altered version of tick() where we keep track of when each
296 major step begins so we can try to figure out where we are using most
297 of the tick time.
298 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700299 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700300 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000301 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700302 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000303 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700304 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000305 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700306 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000307 self._find_aborting()
beeps8bb1f7d2013-08-05 01:30:09 -0700308 self._log_tick_msg('Calling _find_aborted_special_tasks().')
309 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700310 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000311 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700312 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000313 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700314 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000315 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700316 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000317 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700318 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000319 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700320 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000321 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700322 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000323 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700324 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000325 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700326 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700327 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700328 with timer.get_client('email_manager_send_queued_emails'):
329 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700330 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700331 with timer.get_client('django_db_reset_queries'):
332 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000333 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000334
showard97aed502008-11-04 02:01:24 +0000335
mblighf3294cc2009-04-08 21:17:38 +0000336 def _run_cleanup(self):
337 self._periodic_cleanup.run_cleanup_maybe()
338 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000339
mbligh36768f02008-02-22 18:28:33 +0000340
showardf13a9e22009-12-18 22:54:09 +0000341 def _garbage_collection(self):
342 threshold_time = time.time() - self._seconds_between_garbage_stats
343 if threshold_time < self._last_garbage_stats_time:
344 # Don't generate these reports very often.
345 return
346
347 self._last_garbage_stats_time = time.time()
348 # Force a full level 0 collection (because we can, it doesn't hurt
349 # at this interval).
350 gc.collect()
351 logging.info('Logging garbage collector stats on tick %d.',
352 self._tick_count)
353 gc_stats._log_garbage_collector_stats()
354
355
showard170873e2009-01-07 00:22:26 +0000356 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
357 for object_id in object_ids:
358 agent_dict.setdefault(object_id, set()).add(agent)
359
360
361 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
362 for object_id in object_ids:
363 assert object_id in agent_dict
364 agent_dict[object_id].remove(agent)
365
366
showardd1195652009-12-08 22:21:02 +0000367 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700368 """
369 Creates and adds an agent to the dispatchers list.
370
371 In creating the agent we also pass on all the queue_entry_ids and
372 host_ids from the special agent task. For every agent we create, we
373 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
374 against the host_ids given to it. So theoritically, a host can have any
375 number of agents associated with it, and each of them can have any
376 special agent task, though in practice we never see > 1 agent/task per
377 host at any time.
378
379 @param agent_task: A SpecialTask for the agent to manage.
380 """
showardd1195652009-12-08 22:21:02 +0000381 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000382 self._agents.append(agent)
383 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000384 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
385 self._register_agent_for_ids(self._queue_entry_agents,
386 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000387
showard170873e2009-01-07 00:22:26 +0000388
389 def get_agents_for_entry(self, queue_entry):
390 """
391 Find agents corresponding to the specified queue_entry.
392 """
showardd3dc1992009-04-22 21:01:40 +0000393 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000394
395
396 def host_has_agent(self, host):
397 """
398 Determine if there is currently an Agent present using this host.
399 """
400 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000401
402
jadmanski0afbb632008-06-06 21:10:57 +0000403 def remove_agent(self, agent):
404 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000405 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
406 agent)
407 self._unregister_agent_for_ids(self._queue_entry_agents,
408 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000409
410
showard8cc058f2009-09-08 16:26:33 +0000411 def _host_has_scheduled_special_task(self, host):
412 return bool(models.SpecialTask.objects.filter(host__id=host.id,
413 is_active=False,
414 is_complete=False))
415
416
jadmanski0afbb632008-06-06 21:10:57 +0000417 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000418 agent_tasks = self._create_recovery_agent_tasks()
419 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000420 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000421 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000422 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000423 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000424 self._reverify_remaining_hosts()
425 # reinitialize drones after killing orphaned processes, since they can
426 # leave around files when they die
427 _drone_manager.execute_actions()
428 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000429
showard170873e2009-01-07 00:22:26 +0000430
showardd1195652009-12-08 22:21:02 +0000431 def _create_recovery_agent_tasks(self):
432 return (self._get_queue_entry_agent_tasks()
433 + self._get_special_task_agent_tasks(is_active=True))
434
435
436 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700437 """
438 Get agent tasks for all hqe in the specified states.
439
440 Loosely this translates to taking a hqe in one of the specified states,
441 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
442 through _get_agent_task_for_queue_entry. Each queue entry can only have
443 one agent task at a time, but there might be multiple queue entries in
444 the group.
445
446 @return: A list of AgentTasks.
447 """
showardd1195652009-12-08 22:21:02 +0000448 # host queue entry statuses handled directly by AgentTasks (Verifying is
449 # handled through SpecialTasks, so is not listed here)
450 statuses = (models.HostQueueEntry.Status.STARTING,
451 models.HostQueueEntry.Status.RUNNING,
452 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000453 models.HostQueueEntry.Status.PARSING,
454 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000455 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000456 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000457 where='status IN (%s)' % status_list)
458
459 agent_tasks = []
460 used_queue_entries = set()
461 for entry in queue_entries:
462 if self.get_agents_for_entry(entry):
463 # already being handled
464 continue
465 if entry in used_queue_entries:
466 # already picked up by a synchronous job
467 continue
468 agent_task = self._get_agent_task_for_queue_entry(entry)
469 agent_tasks.append(agent_task)
470 used_queue_entries.update(agent_task.queue_entries)
471 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000472
473
showardd1195652009-12-08 22:21:02 +0000474 def _get_special_task_agent_tasks(self, is_active=False):
475 special_tasks = models.SpecialTask.objects.filter(
476 is_active=is_active, is_complete=False)
477 return [self._get_agent_task_for_special_task(task)
478 for task in special_tasks]
479
480
481 def _get_agent_task_for_queue_entry(self, queue_entry):
482 """
beeps8bb1f7d2013-08-05 01:30:09 -0700483 Construct an AgentTask instance for the given active HostQueueEntry.
484
showardd1195652009-12-08 22:21:02 +0000485 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700486 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000487 """
488 task_entries = queue_entry.job.get_group_entries(queue_entry)
489 self._check_for_duplicate_host_entries(task_entries)
490
491 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
492 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000493 if queue_entry.is_hostless():
494 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000495 return QueueTask(queue_entries=task_entries)
496 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700497 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000498 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700499 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000500 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700501 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000502
Dale Curtisaa513362011-03-01 17:27:44 -0800503 raise host_scheduler.SchedulerError(
504 '_get_agent_task_for_queue_entry got entry with '
505 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000506
507
508 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000509 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
510 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000511 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000512 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000513 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000514 if using_host:
showardd1195652009-12-08 22:21:02 +0000515 self._assert_host_has_no_agent(task_entry)
516
517
518 def _assert_host_has_no_agent(self, entry):
519 """
520 @param entry: a HostQueueEntry or a SpecialTask
521 """
522 if self.host_has_agent(entry.host):
523 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800524 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000525 'While scheduling %s, host %s already has a host agent %s'
526 % (entry, entry.host, agent.task))
527
528
529 def _get_agent_task_for_special_task(self, special_task):
530 """
531 Construct an AgentTask class to run the given SpecialTask and add it
532 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700533
534 A special task is create through schedule_special_tasks, but only if
535 the host doesn't already have an agent. This happens through
536 add_agent_task. All special agent tasks are given a host on creation,
537 and a Null hqe. To create a SpecialAgentTask object, you need a
538 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
539 object contains a hqe it's passed on to the special agent task, which
540 creates a HostQueueEntry and saves it as it's queue_entry.
541
showardd1195652009-12-08 22:21:02 +0000542 @param special_task: a models.SpecialTask instance
543 @returns an AgentTask to run this SpecialTask
544 """
545 self._assert_host_has_no_agent(special_task)
546
beeps5e2bb4a2013-10-28 11:26:45 -0700547 special_agent_task_classes = (prejob_task.CleanupTask,
548 prejob_task.VerifyTask,
549 prejob_task.RepairTask,
550 prejob_task.ResetTask,
551 prejob_task.ProvisionTask)
552
showardd1195652009-12-08 22:21:02 +0000553 for agent_task_class in special_agent_task_classes:
554 if agent_task_class.TASK_TYPE == special_task.task:
555 return agent_task_class(task=special_task)
556
Dale Curtisaa513362011-03-01 17:27:44 -0800557 raise host_scheduler.SchedulerError(
558 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000559
560
561 def _register_pidfiles(self, agent_tasks):
562 for agent_task in agent_tasks:
563 agent_task.register_necessary_pidfiles()
564
565
566 def _recover_tasks(self, agent_tasks):
567 orphans = _drone_manager.get_orphaned_autoserv_processes()
568
569 for agent_task in agent_tasks:
570 agent_task.recover()
571 if agent_task.monitor and agent_task.monitor.has_process():
572 orphans.discard(agent_task.monitor.get_process())
573 self.add_agent_task(agent_task)
574
575 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000576
577
showard8cc058f2009-09-08 16:26:33 +0000578 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000579 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
580 % status):
showard0db3d432009-10-12 20:29:15 +0000581 if entry.status == status and not self.get_agents_for_entry(entry):
582 # The status can change during iteration, e.g., if job.run()
583 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000584 yield entry
585
586
showard6878e8b2009-07-20 22:37:45 +0000587 def _check_for_remaining_orphan_processes(self, orphans):
588 if not orphans:
589 return
590 subject = 'Unrecovered orphan autoserv processes remain'
591 message = '\n'.join(str(process) for process in orphans)
592 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000593
594 die_on_orphans = global_config.global_config.get_config_value(
595 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
596
597 if die_on_orphans:
598 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000599
showard170873e2009-01-07 00:22:26 +0000600
showard8cc058f2009-09-08 16:26:33 +0000601 def _recover_pending_entries(self):
602 for entry in self._get_unassigned_entries(
603 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000604 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000605 entry.on_pending()
606
607
showardb8900452009-10-12 20:31:01 +0000608 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000609 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000610 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
611 unrecovered_hqes = []
612 for queue_entry in queue_entries:
613 special_tasks = models.SpecialTask.objects.filter(
614 task__in=(models.SpecialTask.Task.CLEANUP,
615 models.SpecialTask.Task.VERIFY),
616 queue_entry__id=queue_entry.id,
617 is_complete=False)
618 if special_tasks.count() == 0:
619 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000620
showardb8900452009-10-12 20:31:01 +0000621 if unrecovered_hqes:
622 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800623 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000624 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000625 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000626
627
showard65db3932009-10-28 19:54:35 +0000628 def _get_prioritized_special_tasks(self):
629 """
630 Returns all queued SpecialTasks prioritized for repair first, then
631 cleanup, then verify.
beeps8bb1f7d2013-08-05 01:30:09 -0700632
633 @return: list of afe.models.SpecialTasks sorted according to priority.
showard65db3932009-10-28 19:54:35 +0000634 """
635 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
636 is_complete=False,
637 host__locked=False)
638 # exclude hosts with active queue entries unless the SpecialTask is for
639 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000640 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000641 queued_tasks, 'afe_host_queue_entries', 'host_id',
642 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000643 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000644 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000645 where=['(afe_host_queue_entries.id IS NULL OR '
646 'afe_host_queue_entries.id = '
647 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000648
showard65db3932009-10-28 19:54:35 +0000649 # reorder tasks by priority
650 task_priority_order = [models.SpecialTask.Task.REPAIR,
651 models.SpecialTask.Task.CLEANUP,
Dan Shi07e09af2013-04-12 09:31:29 -0700652 models.SpecialTask.Task.VERIFY,
Alex Millerdfff2fd2013-05-28 13:05:06 -0700653 models.SpecialTask.Task.RESET,
654 models.SpecialTask.Task.PROVISION]
showard65db3932009-10-28 19:54:35 +0000655 def task_priority_key(task):
656 return task_priority_order.index(task.task)
657 return sorted(queued_tasks, key=task_priority_key)
658
659
showard65db3932009-10-28 19:54:35 +0000660 def _schedule_special_tasks(self):
661 """
662 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700663
664 Special tasks include PreJobTasks like verify, reset and cleanup.
665 They are created through _schedule_new_jobs and associated with a hqe
666 This method translates SpecialTasks to the appropriate AgentTask and
667 adds them to the dispatchers agents list, so _handle_agents can execute
668 them.
showard65db3932009-10-28 19:54:35 +0000669 """
670 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000671 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000672 continue
showardd1195652009-12-08 22:21:02 +0000673 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000674
675
showard170873e2009-01-07 00:22:26 +0000676 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000677 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000678 # should never happen
showarded2afea2009-07-07 20:54:07 +0000679 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000680 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000681 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700682 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000683 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000684
685
jadmanski0afbb632008-06-06 21:10:57 +0000686 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000687 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700688 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000689 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000690 if self.host_has_agent(host):
691 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000692 continue
showard8cc058f2009-09-08 16:26:33 +0000693 if self._host_has_scheduled_special_task(host):
694 # host will have a special task scheduled on the next cycle
695 continue
showard170873e2009-01-07 00:22:26 +0000696 if print_message:
showardb18134f2009-03-20 20:52:18 +0000697 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000698 models.SpecialTask.objects.create(
699 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000700 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000701
702
jadmanski0afbb632008-06-06 21:10:57 +0000703 def _recover_hosts(self):
704 # recover "Repair Failed" hosts
705 message = 'Reverifying dead host %s'
706 self._reverify_hosts_where("status = 'Repair Failed'",
707 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000708
709
showard04c82c52008-05-29 19:38:12 +0000710
showardb95b1bd2008-08-15 18:11:04 +0000711 def _get_pending_queue_entries(self):
beeps7d8a1b12013-10-29 17:58:34 -0700712 """
713 Fetch a list of new host queue entries.
714
715 The ordering of this list is important, as every new agent
716 we schedule can potentially contribute to the process count
717 on the drone, which has a static limit. The sort order
718 prioritizes jobs as follows:
719 1. High priority jobs: Based on the afe_job's priority
720 2. With hosts and metahosts: This will only happen if we don't
721 activate the hqe after assigning a host to it in
722 schedule_new_jobs.
723 3. With hosts but without metahosts: When tests are scheduled
724 through the frontend the owner of the job would have chosen
725 a host for it.
726 4. Without hosts but with metahosts: This is the common case of
727 a new test that needs a DUT. We assign a host and set it to
728 active so it shouldn't show up in case 2 on the next tick.
729 5. Without hosts and without metahosts: Hostless suite jobs, that
730 will result in new jobs that fall under category 4.
731
732 A note about the ordering of cases 3 and 4:
733 Prioritizing one case above the other leads to earlier acquisition
734 of the following resources: 1. process slots on the drone 2. machines.
735 - When a user schedules a job through the afe they choose a specific
736 host for it. Jobs with metahost can utilize any host that satisfies
737 the metahost criterion. This means that if we had scheduled 4 before
738 3 there is a good chance that a job which could've used another host,
739 will now use the host assigned to a metahost-less job. Given the
740 availability of machines in pool:suites, this almost guarantees
741 starvation for jobs scheduled through the frontend.
742 - Scheduling 4 before 3 also has its pros however, since a suite
743 has the concept of a time out, whereas users can wait. If we hit the
744 process count on the drone a suite can timeout waiting on the test,
745 but a user job generally has a much longer timeout, and relatively
746 harmless consequences.
747 The current ordering was chosed because it is more likely that we will
748 run out of machines in pool:suites than processes on the drone.
749
750 @returns A list of HQEs ordered according to sort_order.
751 """
752 sort_order = ('afe_jobs.priority DESC, '
753 'ISNULL(host_id), '
754 'ISNULL(meta_host), '
755 'job_id')
beeps7d8273b2013-11-06 09:44:34 -0800756 query=('NOT complete AND NOT active AND status="Queued"'
757 'AND NOT aborted')
jamesrenc44ae992010-02-19 00:12:54 +0000758 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000759 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
beeps7d8273b2013-11-06 09:44:34 -0800760 where=query, order_by=sort_order))
mbligh36768f02008-02-22 18:28:33 +0000761
762
showard89f84db2009-03-12 20:39:13 +0000763 def _refresh_pending_queue_entries(self):
764 """
765 Lookup the pending HostQueueEntries and call our HostScheduler
766 refresh() method given that list. Return the list.
767
768 @returns A list of pending HostQueueEntries sorted in priority order.
769 """
showard63a34772008-08-18 19:32:50 +0000770 queue_entries = self._get_pending_queue_entries()
771 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000772 return []
showardb95b1bd2008-08-15 18:11:04 +0000773
showard63a34772008-08-18 19:32:50 +0000774 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000775
showard89f84db2009-03-12 20:39:13 +0000776 return queue_entries
777
778
779 def _schedule_atomic_group(self, queue_entry):
780 """
781 Schedule the given queue_entry on an atomic group of hosts.
782
783 Returns immediately if there are insufficient available hosts.
784
785 Creates new HostQueueEntries based off of queue_entry for the
786 scheduled hosts and starts them all running.
787 """
788 # This is a virtual host queue entry representing an entire
789 # atomic group, find a group and schedule their hosts.
790 group_hosts = self._host_scheduler.find_eligible_atomic_group(
791 queue_entry)
792 if not group_hosts:
793 return
showardcbe6f942009-06-17 19:33:49 +0000794
795 logging.info('Expanding atomic group entry %s with hosts %s',
796 queue_entry,
797 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000798
showard89f84db2009-03-12 20:39:13 +0000799 for assigned_host in group_hosts[1:]:
800 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000801 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000802 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000803 new_hqe.set_host(assigned_host)
804 self._run_queue_entry(new_hqe)
805
806 # The first assigned host uses the original HostQueueEntry
807 queue_entry.set_host(group_hosts[0])
808 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000809
810
showarda9545c02009-12-18 22:44:26 +0000811 def _schedule_hostless_job(self, queue_entry):
812 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000813 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000814
815
showard89f84db2009-03-12 20:39:13 +0000816 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700817 """
818 Find any new HQEs and call schedule_pre_job_tasks for it.
819
820 This involves setting the status of the HQE and creating a row in the
821 db corresponding the the special task, through
822 scheduler_models._queue_special_task. The new db row is then added as
823 an agent to the dispatcher through _schedule_special_tasks and
824 scheduled for execution on the drone through _handle_agents.
825 """
showard89f84db2009-03-12 20:39:13 +0000826 queue_entries = self._refresh_pending_queue_entries()
827 if not queue_entries:
828 return
829
beepsb255fc52013-10-13 23:28:54 -0700830 new_hostless_jobs = 0
831 new_atomic_groups = 0
832 new_jobs_with_hosts = 0
833 new_jobs_need_hosts = 0
834
Simran Basi3f6717d2012-09-13 15:21:22 -0700835 logging.debug('Processing %d queue_entries', len(queue_entries))
showard63a34772008-08-18 19:32:50 +0000836 for queue_entry in queue_entries:
Simran Basidef92872012-09-20 13:34:34 -0700837 self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
showarde55955f2009-10-07 20:48:58 +0000838 is_unassigned_atomic_group = (
839 queue_entry.atomic_group_id is not None
840 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +0000841
842 if queue_entry.is_hostless():
Simran Basidef92872012-09-20 13:34:34 -0700843 self._log_extra_msg('Scheduling hostless job.')
showarda9545c02009-12-18 22:44:26 +0000844 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700845 new_hostless_jobs = new_hostless_jobs + 1
jamesren883492a2010-02-12 00:45:18 +0000846 elif is_unassigned_atomic_group:
847 self._schedule_atomic_group(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700848 new_atmoic_groups = new_atomic_groups + 1
showarde55955f2009-10-07 20:48:58 +0000849 else:
beepsb255fc52013-10-13 23:28:54 -0700850 new_jobs_need_hosts = new_jobs_need_hosts + 1
jamesren883492a2010-02-12 00:45:18 +0000851 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +0000852 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +0000853 assert assigned_host.id == queue_entry.host_id
854 self._run_queue_entry(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700855 new_jobs_with_hosts = new_jobs_with_hosts + 1
856
857 key = 'scheduler.jobs_per_tick'
858 stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
859 stats.Gauge(key).send('new_atomic_groups', new_atomic_groups)
860 stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
861 stats.Gauge(key).send('new_jobs_without_hosts',
862 new_jobs_need_hosts - new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000863
864
showard8cc058f2009-09-08 16:26:33 +0000865 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700866 """
867 Adds agents to the dispatcher.
868
869 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
870 QueueTask for example, will have a job with a control file, and
871 the agent will have methods that poll, abort and check if the queue
872 task is finished. The dispatcher runs the agent_task, as well as
873 other agents in it's _agents member, through _handle_agents, by
874 calling the Agents tick().
875
876 This method creates an agent for each HQE in one of (starting, running,
877 gathering, parsing, archiving) states, and adds it to the dispatcher so
878 it is handled by _handle_agents.
879 """
showardd1195652009-12-08 22:21:02 +0000880 for agent_task in self._get_queue_entry_agent_tasks():
881 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000882
883
884 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000885 for entry in scheduler_models.HostQueueEntry.fetch(
886 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000887 task = entry.job.schedule_delayed_callback_task(entry)
888 if task:
showardd1195652009-12-08 22:21:02 +0000889 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000890
891
jamesren883492a2010-02-12 00:45:18 +0000892 def _run_queue_entry(self, queue_entry):
Simran Basidef92872012-09-20 13:34:34 -0700893 self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
894 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000895 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000896
897
jadmanski0afbb632008-06-06 21:10:57 +0000898 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700899 """
900 Looks through the afe_host_queue_entries for an aborted entry.
901
902 The aborted bit is set on an HQE in many ways, the most common
903 being when a user requests an abort through the frontend, which
904 results in an rpc from the afe to abort_host_queue_entries.
905 """
jamesrene7c65cb2010-06-08 20:38:10 +0000906 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000907 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700908 where='aborted=1 and complete=0'):
showardf4a2e502009-07-28 20:06:39 +0000909 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +0000910 for agent in self.get_agents_for_entry(entry):
911 agent.abort()
912 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000913 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700914 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000915 for job in jobs_to_stop:
916 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000917
918
beeps8bb1f7d2013-08-05 01:30:09 -0700919 def _find_aborted_special_tasks(self):
920 """
921 Find SpecialTasks that have been marked for abortion.
922
923 Poll the database looking for SpecialTasks that are active
924 and have been marked for abortion, then abort them.
925 """
926
927 # The completed and active bits are very important when it comes
928 # to scheduler correctness. The active bit is set through the prolog
929 # of a special task, and reset through the cleanup method of the
930 # SpecialAgentTask. The cleanup is called both through the abort and
931 # epilog. The complete bit is set in several places, and in general
932 # a hanging job will have is_active=1 is_complete=0, while a special
933 # task which completed will have is_active=0 is_complete=1. To check
934 # aborts we directly check active because the complete bit is set in
935 # several places, including the epilog of agent tasks.
936 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
937 is_aborted=True)
938 for task in aborted_tasks:
939 # There are 2 ways to get the agent associated with a task,
940 # through the host and through the hqe. A special task
941 # always needs a host, but doesn't always need a hqe.
942 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700943 if isinstance(agent.task, agent_task.SpecialAgentTask):
beeps8bb1f7d2013-08-05 01:30:09 -0700944
945 # The epilog preforms critical actions such as
946 # queueing the next SpecialTask, requeuing the
947 # hqe etc, however it doesn't actually kill the
948 # monitor process and set the 'done' bit. Epilogs
949 # assume that the job failed, and that the monitor
950 # process has already written an exit code. The
951 # done bit is a necessary condition for
952 # _handle_agents to schedule any more special
953 # tasks against the host, and it must be set
954 # in addition to is_active, is_complete and success.
955 agent.task.epilog()
956 agent.task.abort()
957
958
showard324bf812009-01-20 23:23:38 +0000959 def _can_start_agent(self, agent, num_started_this_cycle,
960 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000961 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000962 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000963 return True
964 # don't allow any nonzero-process agents to run after we've reached a
965 # limit (this avoids starvation of many-process agents)
966 if have_reached_limit:
967 return False
968 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000969 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000970 agent.task.owner_username,
971 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000972 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000973 return False
974 # if a single agent exceeds the per-cycle throttling, still allow it to
975 # run when it's the first agent in the cycle
976 if num_started_this_cycle == 0:
977 return True
978 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000979 if (num_started_this_cycle + agent.task.num_processes >
980 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000981 return False
982 return True
983
984
jadmanski0afbb632008-06-06 21:10:57 +0000985 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700986 """
987 Handles agents of the dispatcher.
988
989 Appropriate Agents are added to the dispatcher through
990 _schedule_running_host_queue_entries. These agents each
991 have a task. This method runs the agents task through
992 agent.tick() leading to:
993 agent.start
994 prolog -> AgentTasks prolog
995 For each queue entry:
996 sets host status/status to Running
997 set started_on in afe_host_queue_entries
998 run -> AgentTasks run
999 Creates PidfileRunMonitor
1000 Queues the autoserv command line for this AgentTask
1001 via the drone manager. These commands are executed
1002 through the drone managers execute actions.
1003 poll -> AgentTasks/BaseAgentTask poll
1004 checks the monitors exit_code.
1005 Executes epilog if task is finished.
1006 Executes AgentTasks _finish_task
1007 finish_task is usually responsible for setting the status
1008 of the HQE/host, and updating it's active and complete fileds.
1009
1010 agent.is_done
1011 Removed the agent from the dispatchers _agents queue.
1012 Is_done checks the finished bit on the agent, that is
1013 set based on the Agents task. During the agents poll
1014 we check to see if the monitor process has exited in
1015 it's finish method, and set the success member of the
1016 task based on this exit code.
1017 """
jadmanski0afbb632008-06-06 21:10:57 +00001018 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001019 have_reached_limit = False
1020 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -07001021 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +00001022 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001023 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1024 'queue_entry ids:%s' % (agent.host_ids,
1025 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001026 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001027 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001028 have_reached_limit):
1029 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001030 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001031 continue
showardd1195652009-12-08 22:21:02 +00001032 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001033 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001034 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001035 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001036 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -07001037 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001038 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -07001039 logging.info('%d running processes. %d added this cycle.',
1040 _drone_manager.total_running_processes(),
1041 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +00001042
1043
showard29f7cd22009-04-29 21:16:24 +00001044 def _process_recurring_runs(self):
1045 recurring_runs = models.RecurringRun.objects.filter(
1046 start_date__lte=datetime.datetime.now())
1047 for rrun in recurring_runs:
1048 # Create job from template
1049 job = rrun.job
1050 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001051 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001052
1053 host_objects = info['hosts']
1054 one_time_hosts = info['one_time_hosts']
1055 metahost_objects = info['meta_hosts']
1056 dependencies = info['dependencies']
1057 atomic_group = info['atomic_group']
1058
1059 for host in one_time_hosts or []:
1060 this_host = models.Host.create_one_time_host(host.hostname)
1061 host_objects.append(this_host)
1062
1063 try:
1064 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001065 options=options,
showard29f7cd22009-04-29 21:16:24 +00001066 host_objects=host_objects,
1067 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001068 atomic_group=atomic_group)
1069
1070 except Exception, ex:
1071 logging.exception(ex)
1072 #TODO send email
1073
1074 if rrun.loop_count == 1:
1075 rrun.delete()
1076 else:
1077 if rrun.loop_count != 0: # if not infinite loop
1078 # calculate new start_date
1079 difference = datetime.timedelta(seconds=rrun.loop_period)
1080 rrun.start_date = rrun.start_date + difference
1081 rrun.loop_count -= 1
1082 rrun.save()
1083
1084
Simran Basia858a232012-08-21 11:04:37 -07001085SiteDispatcher = utils.import_site_class(
1086 __file__, 'autotest_lib.scheduler.site_monitor_db',
1087 'SiteDispatcher', BaseDispatcher)
1088
1089class Dispatcher(SiteDispatcher):
1090 pass
1091
1092
mbligh36768f02008-02-22 18:28:33 +00001093class Agent(object):
showard77182562009-06-10 00:16:05 +00001094 """
Alex Miller47715eb2013-07-24 03:34:01 -07001095 An agent for use by the Dispatcher class to perform a task. An agent wraps
1096 around an AgentTask mainly to associate the AgentTask with the queue_entry
1097 and host ids.
showard77182562009-06-10 00:16:05 +00001098
1099 The following methods are required on all task objects:
1100 poll() - Called periodically to let the task check its status and
1101 update its internal state. If the task succeeded.
1102 is_done() - Returns True if the task is finished.
1103 abort() - Called when an abort has been requested. The task must
1104 set its aborted attribute to True if it actually aborted.
1105
1106 The following attributes are required on all task objects:
1107 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001108 success - bool, True if this task succeeded.
1109 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1110 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001111 """
1112
1113
showard418785b2009-11-23 20:19:59 +00001114 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001115 """
Alex Miller47715eb2013-07-24 03:34:01 -07001116 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001117 """
showard8cc058f2009-09-08 16:26:33 +00001118 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001119
showard77182562009-06-10 00:16:05 +00001120 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001121 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001122
showard8cc058f2009-09-08 16:26:33 +00001123 self.queue_entry_ids = task.queue_entry_ids
1124 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001125
showard8cc058f2009-09-08 16:26:33 +00001126 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001127 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001128
1129
jadmanski0afbb632008-06-06 21:10:57 +00001130 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001131 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001132 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001133 self.task.poll()
1134 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001135 self.finished = True
showardec113162008-05-08 00:52:49 +00001136
1137
jadmanski0afbb632008-06-06 21:10:57 +00001138 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001139 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001140
1141
showardd3dc1992009-04-22 21:01:40 +00001142 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001143 if self.task:
1144 self.task.abort()
1145 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001146 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001147 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001148
showardd3dc1992009-04-22 21:01:40 +00001149
beeps5e2bb4a2013-10-28 11:26:45 -07001150class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001151 """
1152 Common functionality for QueueTask and HostlessQueueTask
1153 """
1154 def __init__(self, queue_entries):
1155 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001156 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001157 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001158
1159
showard73ec0442009-02-07 02:05:20 +00001160 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001161 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001162
1163
jamesrenc44ae992010-02-19 00:12:54 +00001164 def _write_control_file(self, execution_path):
1165 control_path = _drone_manager.attach_file_to_execution(
1166 execution_path, self.job.control_file)
1167 return control_path
1168
1169
Aviv Keshet308e7362013-05-21 14:43:16 -07001170 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001171 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001172 execution_path = self.queue_entries[0].execution_path()
1173 control_path = self._write_control_file(execution_path)
1174 hostnames = ','.join(entry.host.hostname
1175 for entry in self.queue_entries
1176 if not entry.is_hostless())
1177
1178 execution_tag = self.queue_entries[0].execution_tag()
1179 params = _autoserv_command_line(
1180 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001181 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001182 _drone_manager.absolute_path(control_path)],
1183 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001184 if self.job.is_image_update_job():
1185 params += ['--image', self.job.update_image_path]
1186
jamesrenc44ae992010-02-19 00:12:54 +00001187 return params
showardd1195652009-12-08 22:21:02 +00001188
1189
1190 @property
1191 def num_processes(self):
1192 return len(self.queue_entries)
1193
1194
1195 @property
1196 def owner_username(self):
1197 return self.job.owner
1198
1199
1200 def _working_directory(self):
1201 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001202
1203
jadmanski0afbb632008-06-06 21:10:57 +00001204 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001205 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001206 keyval_dict = self.job.keyval_dict()
1207 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001208 group_name = self.queue_entries[0].get_group_name()
1209 if group_name:
1210 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001211 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001212 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001213 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001214 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001215
1216
showard35162b02009-03-03 02:17:30 +00001217 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001218 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001219 _drone_manager.write_lines_to_file(error_file_path,
1220 [_LOST_PROCESS_ERROR])
1221
1222
showardd3dc1992009-04-22 21:01:40 +00001223 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001224 if not self.monitor:
1225 return
1226
showardd9205182009-04-27 20:09:55 +00001227 self._write_job_finished()
1228
showard35162b02009-03-03 02:17:30 +00001229 if self.monitor.lost_process:
1230 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001231
jadmanskif7fa2cc2008-10-01 14:13:23 +00001232
showardcbd74612008-11-19 21:42:02 +00001233 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001234 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001235 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001236 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001237 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001238
1239
jadmanskif7fa2cc2008-10-01 14:13:23 +00001240 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001241 if not self.monitor or not self.monitor.has_process():
1242 return
1243
jadmanskif7fa2cc2008-10-01 14:13:23 +00001244 # build up sets of all the aborted_by and aborted_on values
1245 aborted_by, aborted_on = set(), set()
1246 for queue_entry in self.queue_entries:
1247 if queue_entry.aborted_by:
1248 aborted_by.add(queue_entry.aborted_by)
1249 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1250 aborted_on.add(t)
1251
1252 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001253 # TODO(showard): this conditional is now obsolete, we just need to leave
1254 # it in temporarily for backwards compatibility over upgrades. delete
1255 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001256 assert len(aborted_by) <= 1
1257 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001258 aborted_by_value = aborted_by.pop()
1259 aborted_on_value = max(aborted_on)
1260 else:
1261 aborted_by_value = 'autotest_system'
1262 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001263
showarda0382352009-02-11 23:36:43 +00001264 self._write_keyval_after_job("aborted_by", aborted_by_value)
1265 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001266
showardcbd74612008-11-19 21:42:02 +00001267 aborted_on_string = str(datetime.datetime.fromtimestamp(
1268 aborted_on_value))
1269 self._write_status_comment('Job aborted by %s on %s' %
1270 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001271
1272
jadmanski0afbb632008-06-06 21:10:57 +00001273 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001274 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001275 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001276 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001277
1278
jadmanski0afbb632008-06-06 21:10:57 +00001279 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001280 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001281 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001282
1283
1284class QueueTask(AbstractQueueTask):
1285 def __init__(self, queue_entries):
1286 super(QueueTask, self).__init__(queue_entries)
1287 self._set_ids(queue_entries=queue_entries)
1288
1289
1290 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001291 self._check_queue_entry_statuses(
1292 self.queue_entries,
1293 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1294 models.HostQueueEntry.Status.RUNNING),
1295 allowed_host_statuses=(models.Host.Status.PENDING,
1296 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001297
1298 super(QueueTask, self).prolog()
1299
1300 for queue_entry in self.queue_entries:
1301 self._write_host_keyvals(queue_entry.host)
1302 queue_entry.host.set_status(models.Host.Status.RUNNING)
1303 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001304
1305
1306 def _finish_task(self):
1307 super(QueueTask, self)._finish_task()
1308
1309 for queue_entry in self.queue_entries:
1310 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001311 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001312
1313
Alex Miller9f01d5d2013-08-08 02:26:01 -07001314 def _command_line(self):
1315 invocation = super(QueueTask, self)._command_line()
1316 return invocation + ['--verify_job_repo_url']
1317
1318
Dan Shi1a189052013-10-28 14:41:35 -07001319class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001320 def __init__(self, queue_entry):
1321 super(HostlessQueueTask, self).__init__([queue_entry])
1322 self.queue_entry_ids = [queue_entry.id]
1323
1324
1325 def prolog(self):
1326 self.queue_entries[0].update_field('execution_subdir', 'hostless')
1327 super(HostlessQueueTask, self).prolog()
1328
1329
mbligh4608b002010-01-05 18:22:35 +00001330 def _finish_task(self):
1331 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00001332 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001333
1334
mbligh36768f02008-02-22 18:28:33 +00001335if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001336 main()