blob: 3cacbfa3cf58658189abe84185ec1f8efd45da8b [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
Aviv Keshet225bdfe2013-03-05 10:10:08 -08008import datetime, optparse, os, signal
beeps5e2bb4a2013-10-28 11:26:45 -07009import sys, time
Aviv Keshet225bdfe2013-03-05 10:10:08 -080010import logging, gc
showard402934a2009-12-21 22:20:47 +000011
Alex Miller05d7b4c2013-03-04 07:49:38 -080012import common
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000014
15import django.db
16
showard136e6dc2009-06-10 19:38:49 +000017from autotest_lib.client.common_lib import global_config, logging_manager
beeps5e2bb4a2013-10-28 11:26:45 -070018from autotest_lib.client.common_lib import utils
showardb1e51872008-10-07 11:08:18 +000019from autotest_lib.database import database_connection
Alex Miller05d7b4c2013-03-04 07:49:38 -080020from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
beeps5e2bb4a2013-10-28 11:26:45 -070021from autotest_lib.scheduler import agent_task, drone_manager, drones
22from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
23from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
24from autotest_lib.scheduler import postjob_task, scheduler_logging_config
jamesrenc44ae992010-02-19 00:12:54 +000025from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080026from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070027from autotest_lib.server import autoserv_utils
Fang Deng1d6c2a02013-04-17 15:25:45 -070028from autotest_lib.site_utils.graphite import stats
Alex Miller05d7b4c2013-03-04 07:49:38 -080029
showard549afad2009-08-20 23:33:36 +000030BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
31PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000032
mbligh36768f02008-02-22 18:28:33 +000033RESULTS_DIR = '.'
showard170873e2009-01-07 00:22:26 +000034DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000035AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
36
37if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000038 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000039AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
40AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
41
42if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000043 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000044
showard35162b02009-03-03 02:17:30 +000045# error message to leave in results dir when an autoserv process disappears
46# mysteriously
47_LOST_PROCESS_ERROR = """\
48Autoserv failed abnormally during execution for this job, probably due to a
49system error on the Autotest server. Full results may not be available. Sorry.
50"""
51
mbligh6f8bab42008-02-29 22:45:14 +000052_db = None
mbligh36768f02008-02-22 18:28:33 +000053_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070054
55# These 2 globals are replaced for testing
56_autoserv_directory = autoserv_utils.autoserv_directory
57_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000059_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000060
Eric Lie0493a42010-11-15 13:05:43 -080061def _parser_path_default(install_dir):
62 return os.path.join(install_dir, 'tko', 'parse')
63_parser_path_func = utils.import_site_function(
64 __file__, 'autotest_lib.scheduler.site_monitor_db',
65 'parser_path', _parser_path_default)
66_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
67
mbligh36768f02008-02-22 18:28:33 +000068
mbligh83c1e9e2009-05-01 23:10:41 +000069def _site_init_monitor_db_dummy():
70 return {}
71
72
jamesren76fcf192010-04-21 20:39:50 +000073def _verify_default_drone_set_exists():
74 if (models.DroneSet.drone_sets_enabled() and
75 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080076 raise host_scheduler.SchedulerError(
77 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000078
79
80def _sanity_check():
81 """Make sure the configs are consistent before starting the scheduler"""
82 _verify_default_drone_set_exists()
83
84
mbligh36768f02008-02-22 18:28:33 +000085def main():
showard27f33872009-04-07 18:20:53 +000086 try:
showard549afad2009-08-20 23:33:36 +000087 try:
88 main_without_exception_handling()
89 except SystemExit:
90 raise
91 except:
92 logging.exception('Exception escaping in monitor_db')
93 raise
94 finally:
95 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000096
97
98def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000099 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000100
showard136e6dc2009-06-10 19:38:49 +0000101 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000102 parser = optparse.OptionParser(usage)
103 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
104 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000105 parser.add_option('--test', help='Indicate that scheduler is under ' +
106 'test and should use dummy autoserv and no parsing',
107 action='store_true')
108 (options, args) = parser.parse_args()
109 if len(args) != 1:
110 parser.print_usage()
111 return
mbligh36768f02008-02-22 18:28:33 +0000112
showard5613c662009-06-08 23:30:33 +0000113 scheduler_enabled = global_config.global_config.get_config_value(
114 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
115
116 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800117 logging.error("Scheduler not enabled, set enable_scheduler to true in "
118 "the global_config's SCHEDULER section to enable it. "
119 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000120 sys.exit(1)
121
jadmanski0afbb632008-06-06 21:10:57 +0000122 global RESULTS_DIR
123 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000124
mbligh83c1e9e2009-05-01 23:10:41 +0000125 site_init = utils.import_site_function(__file__,
126 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
127 _site_init_monitor_db_dummy)
128 site_init()
129
showardcca334f2009-03-12 20:38:34 +0000130 # Change the cwd while running to avoid issues incase we were launched from
131 # somewhere odd (such as a random NFS home directory of the person running
132 # sudo to launch us as the appropriate user).
133 os.chdir(RESULTS_DIR)
134
jamesrenc7d387e2010-08-10 21:48:30 +0000135 # This is helpful for debugging why stuff a scheduler launches is
136 # misbehaving.
137 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000138
jadmanski0afbb632008-06-06 21:10:57 +0000139 if options.test:
140 global _autoserv_path
141 _autoserv_path = 'autoserv_dummy'
142 global _testing_mode
143 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000144
jamesrenc44ae992010-02-19 00:12:54 +0000145 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000146 server.start()
147
jadmanski0afbb632008-06-06 21:10:57 +0000148 try:
jamesrenc44ae992010-02-19 00:12:54 +0000149 initialize()
showardc5afc462009-01-13 00:09:39 +0000150 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000151 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000152
Eric Lia82dc352011-02-23 13:15:52 -0800153 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000154 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000155 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000156 except:
showard170873e2009-01-07 00:22:26 +0000157 email_manager.manager.log_stacktrace(
158 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000159
showard170873e2009-01-07 00:22:26 +0000160 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000161 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000162 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000163 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000164
165
showard136e6dc2009-06-10 19:38:49 +0000166def setup_logging():
167 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
168 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
169 logging_manager.configure_logging(
170 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
171 logfile_name=log_name)
172
173
mbligh36768f02008-02-22 18:28:33 +0000174def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000175 global _shutdown
176 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000177 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000178
179
jamesrenc44ae992010-02-19 00:12:54 +0000180def initialize():
showardb18134f2009-03-20 20:52:18 +0000181 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
182 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000183
showard8de37132009-08-31 18:33:08 +0000184 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000185 logging.critical("monitor_db already running, aborting!")
186 sys.exit(1)
187 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000188
showardb1e51872008-10-07 11:08:18 +0000189 if _testing_mode:
190 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000191 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000192
jadmanski0afbb632008-06-06 21:10:57 +0000193 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
194 global _db
showard170873e2009-01-07 00:22:26 +0000195 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000196 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000197
showardfa8629c2008-11-04 16:51:23 +0000198 # ensure Django connection is in autocommit
199 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000200 # bypass the readonly connection
201 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000202
showardb18134f2009-03-20 20:52:18 +0000203 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000204 signal.signal(signal.SIGINT, handle_sigint)
205
jamesrenc44ae992010-02-19 00:12:54 +0000206 initialize_globals()
207 scheduler_models.initialize()
208
showardd1ee1dd2009-01-07 21:33:08 +0000209 drones = global_config.global_config.get_config_value(
210 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
211 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000212 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000213 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000214 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
215
showardb18134f2009-03-20 20:52:18 +0000216 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000217
218
jamesrenc44ae992010-02-19 00:12:54 +0000219def initialize_globals():
220 global _drone_manager
221 _drone_manager = drone_manager.instance()
222
223
showarded2afea2009-07-07 20:54:07 +0000224def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
225 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000226 """
227 @returns The autoserv command line as a list of executable + parameters.
228
229 @param machines - string - A machine or comma separated list of machines
230 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000231 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700232 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
233 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000234 @param queue_entry - A HostQueueEntry object - If supplied and no Job
235 object was supplied, this will be used to lookup the Job object.
236 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700237 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
238 machines, results_directory=drone_manager.WORKING_DIRECTORY,
239 extra_args=extra_args, job=job, queue_entry=queue_entry,
240 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000241
242
Simran Basia858a232012-08-21 11:04:37 -0700243class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800244
245
jadmanski0afbb632008-06-06 21:10:57 +0000246 def __init__(self):
247 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000248 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800249 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000250 user_cleanup_time = scheduler_config.config.clean_interval
251 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
252 _db, user_cleanup_time)
253 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000254 self._host_agents = {}
255 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000256 self._tick_count = 0
257 self._last_garbage_stats_time = time.time()
258 self._seconds_between_garbage_stats = 60 * (
259 global_config.global_config.get_config_value(
260 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700261 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700262 self._tick_debug = global_config.global_config.get_config_value(
263 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
264 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700265 self._extra_debugging = global_config.global_config.get_config_value(
266 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
267 default=False)
mbligh36768f02008-02-22 18:28:33 +0000268
mbligh36768f02008-02-22 18:28:33 +0000269
showard915958d2009-04-22 21:00:58 +0000270 def initialize(self, recover_hosts=True):
271 self._periodic_cleanup.initialize()
272 self._24hr_upkeep.initialize()
273
jadmanski0afbb632008-06-06 21:10:57 +0000274 # always recover processes
275 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000276
jadmanski0afbb632008-06-06 21:10:57 +0000277 if recover_hosts:
278 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000279
jamesrenc44ae992010-02-19 00:12:54 +0000280 self._host_scheduler.recovery_on_startup()
281
mbligh36768f02008-02-22 18:28:33 +0000282
Simran Basi0ec94dd2012-08-28 09:50:10 -0700283 def _log_tick_msg(self, msg):
284 if self._tick_debug:
285 logging.debug(msg)
286
287
Simran Basidef92872012-09-20 13:34:34 -0700288 def _log_extra_msg(self, msg):
289 if self._extra_debugging:
290 logging.debug(msg)
291
292
jadmanski0afbb632008-06-06 21:10:57 +0000293 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700294 """
295 This is an altered version of tick() where we keep track of when each
296 major step begins so we can try to figure out where we are using most
297 of the tick time.
298 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700299 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700300 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000301 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700302 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000303 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700304 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000305 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700306 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000307 self._find_aborting()
beeps8bb1f7d2013-08-05 01:30:09 -0700308 self._log_tick_msg('Calling _find_aborted_special_tasks().')
309 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700310 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000311 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700312 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000313 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700314 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000315 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700316 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000317 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700318 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000319 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700320 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000321 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700322 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000323 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700324 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000325 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700326 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700327 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700328 with timer.get_client('email_manager_send_queued_emails'):
329 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700330 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700331 with timer.get_client('django_db_reset_queries'):
332 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000333 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000334
showard97aed502008-11-04 02:01:24 +0000335
mblighf3294cc2009-04-08 21:17:38 +0000336 def _run_cleanup(self):
337 self._periodic_cleanup.run_cleanup_maybe()
338 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000339
mbligh36768f02008-02-22 18:28:33 +0000340
showardf13a9e22009-12-18 22:54:09 +0000341 def _garbage_collection(self):
342 threshold_time = time.time() - self._seconds_between_garbage_stats
343 if threshold_time < self._last_garbage_stats_time:
344 # Don't generate these reports very often.
345 return
346
347 self._last_garbage_stats_time = time.time()
348 # Force a full level 0 collection (because we can, it doesn't hurt
349 # at this interval).
350 gc.collect()
351 logging.info('Logging garbage collector stats on tick %d.',
352 self._tick_count)
353 gc_stats._log_garbage_collector_stats()
354
355
showard170873e2009-01-07 00:22:26 +0000356 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
357 for object_id in object_ids:
358 agent_dict.setdefault(object_id, set()).add(agent)
359
360
361 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
362 for object_id in object_ids:
363 assert object_id in agent_dict
364 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700365 # If an ID has no more active agent associated, there is no need to
366 # keep it in the dictionary. Otherwise, scheduler will keep an
367 # unnecessarily big dictionary until being restarted.
368 if not agent_dict[object_id]:
369 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000370
371
showardd1195652009-12-08 22:21:02 +0000372 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700373 """
374 Creates and adds an agent to the dispatchers list.
375
376 In creating the agent we also pass on all the queue_entry_ids and
377 host_ids from the special agent task. For every agent we create, we
378 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
379 against the host_ids given to it. So theoritically, a host can have any
380 number of agents associated with it, and each of them can have any
381 special agent task, though in practice we never see > 1 agent/task per
382 host at any time.
383
384 @param agent_task: A SpecialTask for the agent to manage.
385 """
showardd1195652009-12-08 22:21:02 +0000386 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000387 self._agents.append(agent)
388 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000389 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
390 self._register_agent_for_ids(self._queue_entry_agents,
391 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000392
showard170873e2009-01-07 00:22:26 +0000393
394 def get_agents_for_entry(self, queue_entry):
395 """
396 Find agents corresponding to the specified queue_entry.
397 """
showardd3dc1992009-04-22 21:01:40 +0000398 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000399
400
401 def host_has_agent(self, host):
402 """
403 Determine if there is currently an Agent present using this host.
404 """
405 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000406
407
jadmanski0afbb632008-06-06 21:10:57 +0000408 def remove_agent(self, agent):
409 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000410 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
411 agent)
412 self._unregister_agent_for_ids(self._queue_entry_agents,
413 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000414
415
showard8cc058f2009-09-08 16:26:33 +0000416 def _host_has_scheduled_special_task(self, host):
417 return bool(models.SpecialTask.objects.filter(host__id=host.id,
418 is_active=False,
419 is_complete=False))
420
421
jadmanski0afbb632008-06-06 21:10:57 +0000422 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000423 agent_tasks = self._create_recovery_agent_tasks()
424 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000425 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000426 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000427 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000428 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000429 self._reverify_remaining_hosts()
430 # reinitialize drones after killing orphaned processes, since they can
431 # leave around files when they die
432 _drone_manager.execute_actions()
433 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000434
showard170873e2009-01-07 00:22:26 +0000435
showardd1195652009-12-08 22:21:02 +0000436 def _create_recovery_agent_tasks(self):
437 return (self._get_queue_entry_agent_tasks()
438 + self._get_special_task_agent_tasks(is_active=True))
439
440
441 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700442 """
443 Get agent tasks for all hqe in the specified states.
444
445 Loosely this translates to taking a hqe in one of the specified states,
446 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
447 through _get_agent_task_for_queue_entry. Each queue entry can only have
448 one agent task at a time, but there might be multiple queue entries in
449 the group.
450
451 @return: A list of AgentTasks.
452 """
showardd1195652009-12-08 22:21:02 +0000453 # host queue entry statuses handled directly by AgentTasks (Verifying is
454 # handled through SpecialTasks, so is not listed here)
455 statuses = (models.HostQueueEntry.Status.STARTING,
456 models.HostQueueEntry.Status.RUNNING,
457 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000458 models.HostQueueEntry.Status.PARSING,
459 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000460 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000461 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000462 where='status IN (%s)' % status_list)
Alex Miller47cd2472013-11-25 15:20:04 -0800463 stats.Gauge('scheduler.jobs_per_tick').send(
464 'running', len(queue_entries))
showardd1195652009-12-08 22:21:02 +0000465
466 agent_tasks = []
467 used_queue_entries = set()
468 for entry in queue_entries:
469 if self.get_agents_for_entry(entry):
470 # already being handled
471 continue
472 if entry in used_queue_entries:
473 # already picked up by a synchronous job
474 continue
475 agent_task = self._get_agent_task_for_queue_entry(entry)
476 agent_tasks.append(agent_task)
477 used_queue_entries.update(agent_task.queue_entries)
478 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000479
480
showardd1195652009-12-08 22:21:02 +0000481 def _get_special_task_agent_tasks(self, is_active=False):
482 special_tasks = models.SpecialTask.objects.filter(
483 is_active=is_active, is_complete=False)
484 return [self._get_agent_task_for_special_task(task)
485 for task in special_tasks]
486
487
488 def _get_agent_task_for_queue_entry(self, queue_entry):
489 """
beeps8bb1f7d2013-08-05 01:30:09 -0700490 Construct an AgentTask instance for the given active HostQueueEntry.
491
showardd1195652009-12-08 22:21:02 +0000492 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700493 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000494 """
495 task_entries = queue_entry.job.get_group_entries(queue_entry)
496 self._check_for_duplicate_host_entries(task_entries)
497
498 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
499 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000500 if queue_entry.is_hostless():
501 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000502 return QueueTask(queue_entries=task_entries)
503 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700504 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000505 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700506 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000507 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700508 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000509
Dale Curtisaa513362011-03-01 17:27:44 -0800510 raise host_scheduler.SchedulerError(
511 '_get_agent_task_for_queue_entry got entry with '
512 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000513
514
515 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000516 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
517 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000518 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000519 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000520 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000521 if using_host:
showardd1195652009-12-08 22:21:02 +0000522 self._assert_host_has_no_agent(task_entry)
523
524
525 def _assert_host_has_no_agent(self, entry):
526 """
527 @param entry: a HostQueueEntry or a SpecialTask
528 """
529 if self.host_has_agent(entry.host):
530 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800531 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000532 'While scheduling %s, host %s already has a host agent %s'
533 % (entry, entry.host, agent.task))
534
535
536 def _get_agent_task_for_special_task(self, special_task):
537 """
538 Construct an AgentTask class to run the given SpecialTask and add it
539 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700540
541 A special task is create through schedule_special_tasks, but only if
542 the host doesn't already have an agent. This happens through
543 add_agent_task. All special agent tasks are given a host on creation,
544 and a Null hqe. To create a SpecialAgentTask object, you need a
545 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
546 object contains a hqe it's passed on to the special agent task, which
547 creates a HostQueueEntry and saves it as it's queue_entry.
548
showardd1195652009-12-08 22:21:02 +0000549 @param special_task: a models.SpecialTask instance
550 @returns an AgentTask to run this SpecialTask
551 """
552 self._assert_host_has_no_agent(special_task)
553
beeps5e2bb4a2013-10-28 11:26:45 -0700554 special_agent_task_classes = (prejob_task.CleanupTask,
555 prejob_task.VerifyTask,
556 prejob_task.RepairTask,
557 prejob_task.ResetTask,
558 prejob_task.ProvisionTask)
559
showardd1195652009-12-08 22:21:02 +0000560 for agent_task_class in special_agent_task_classes:
561 if agent_task_class.TASK_TYPE == special_task.task:
562 return agent_task_class(task=special_task)
563
Dale Curtisaa513362011-03-01 17:27:44 -0800564 raise host_scheduler.SchedulerError(
565 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000566
567
568 def _register_pidfiles(self, agent_tasks):
569 for agent_task in agent_tasks:
570 agent_task.register_necessary_pidfiles()
571
572
573 def _recover_tasks(self, agent_tasks):
574 orphans = _drone_manager.get_orphaned_autoserv_processes()
575
576 for agent_task in agent_tasks:
577 agent_task.recover()
578 if agent_task.monitor and agent_task.monitor.has_process():
579 orphans.discard(agent_task.monitor.get_process())
580 self.add_agent_task(agent_task)
581
582 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000583
584
showard8cc058f2009-09-08 16:26:33 +0000585 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000586 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
587 % status):
showard0db3d432009-10-12 20:29:15 +0000588 if entry.status == status and not self.get_agents_for_entry(entry):
589 # The status can change during iteration, e.g., if job.run()
590 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000591 yield entry
592
593
showard6878e8b2009-07-20 22:37:45 +0000594 def _check_for_remaining_orphan_processes(self, orphans):
595 if not orphans:
596 return
597 subject = 'Unrecovered orphan autoserv processes remain'
598 message = '\n'.join(str(process) for process in orphans)
599 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000600
601 die_on_orphans = global_config.global_config.get_config_value(
602 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
603
604 if die_on_orphans:
605 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000606
showard170873e2009-01-07 00:22:26 +0000607
showard8cc058f2009-09-08 16:26:33 +0000608 def _recover_pending_entries(self):
609 for entry in self._get_unassigned_entries(
610 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000611 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000612 entry.on_pending()
613
614
showardb8900452009-10-12 20:31:01 +0000615 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000616 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000617 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
618 unrecovered_hqes = []
619 for queue_entry in queue_entries:
620 special_tasks = models.SpecialTask.objects.filter(
621 task__in=(models.SpecialTask.Task.CLEANUP,
622 models.SpecialTask.Task.VERIFY),
623 queue_entry__id=queue_entry.id,
624 is_complete=False)
625 if special_tasks.count() == 0:
626 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000627
showardb8900452009-10-12 20:31:01 +0000628 if unrecovered_hqes:
629 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800630 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000631 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000632 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000633
634
showard65db3932009-10-28 19:54:35 +0000635 def _get_prioritized_special_tasks(self):
636 """
637 Returns all queued SpecialTasks prioritized for repair first, then
638 cleanup, then verify.
beeps8bb1f7d2013-08-05 01:30:09 -0700639
640 @return: list of afe.models.SpecialTasks sorted according to priority.
showard65db3932009-10-28 19:54:35 +0000641 """
642 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
643 is_complete=False,
644 host__locked=False)
645 # exclude hosts with active queue entries unless the SpecialTask is for
646 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000647 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000648 queued_tasks, 'afe_host_queue_entries', 'host_id',
649 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000650 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000651 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000652 where=['(afe_host_queue_entries.id IS NULL OR '
653 'afe_host_queue_entries.id = '
654 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000655
showard65db3932009-10-28 19:54:35 +0000656 # reorder tasks by priority
657 task_priority_order = [models.SpecialTask.Task.REPAIR,
658 models.SpecialTask.Task.CLEANUP,
Dan Shi07e09af2013-04-12 09:31:29 -0700659 models.SpecialTask.Task.VERIFY,
Alex Millerdfff2fd2013-05-28 13:05:06 -0700660 models.SpecialTask.Task.RESET,
661 models.SpecialTask.Task.PROVISION]
showard65db3932009-10-28 19:54:35 +0000662 def task_priority_key(task):
663 return task_priority_order.index(task.task)
664 return sorted(queued_tasks, key=task_priority_key)
665
666
showard65db3932009-10-28 19:54:35 +0000667 def _schedule_special_tasks(self):
668 """
669 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700670
671 Special tasks include PreJobTasks like verify, reset and cleanup.
672 They are created through _schedule_new_jobs and associated with a hqe
673 This method translates SpecialTasks to the appropriate AgentTask and
674 adds them to the dispatchers agents list, so _handle_agents can execute
675 them.
showard65db3932009-10-28 19:54:35 +0000676 """
677 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000678 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000679 continue
showardd1195652009-12-08 22:21:02 +0000680 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000681
682
showard170873e2009-01-07 00:22:26 +0000683 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000684 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000685 # should never happen
showarded2afea2009-07-07 20:54:07 +0000686 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000687 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000688 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700689 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000690 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000691
692
jadmanski0afbb632008-06-06 21:10:57 +0000693 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000694 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700695 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000696 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000697 if self.host_has_agent(host):
698 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000699 continue
showard8cc058f2009-09-08 16:26:33 +0000700 if self._host_has_scheduled_special_task(host):
701 # host will have a special task scheduled on the next cycle
702 continue
showard170873e2009-01-07 00:22:26 +0000703 if print_message:
showardb18134f2009-03-20 20:52:18 +0000704 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000705 models.SpecialTask.objects.create(
706 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000707 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000708
709
jadmanski0afbb632008-06-06 21:10:57 +0000710 def _recover_hosts(self):
711 # recover "Repair Failed" hosts
712 message = 'Reverifying dead host %s'
713 self._reverify_hosts_where("status = 'Repair Failed'",
714 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000715
716
showard04c82c52008-05-29 19:38:12 +0000717
showardb95b1bd2008-08-15 18:11:04 +0000718 def _get_pending_queue_entries(self):
beeps7d8a1b12013-10-29 17:58:34 -0700719 """
720 Fetch a list of new host queue entries.
721
722 The ordering of this list is important, as every new agent
723 we schedule can potentially contribute to the process count
724 on the drone, which has a static limit. The sort order
725 prioritizes jobs as follows:
726 1. High priority jobs: Based on the afe_job's priority
727 2. With hosts and metahosts: This will only happen if we don't
728 activate the hqe after assigning a host to it in
729 schedule_new_jobs.
730 3. With hosts but without metahosts: When tests are scheduled
731 through the frontend the owner of the job would have chosen
732 a host for it.
733 4. Without hosts but with metahosts: This is the common case of
734 a new test that needs a DUT. We assign a host and set it to
735 active so it shouldn't show up in case 2 on the next tick.
736 5. Without hosts and without metahosts: Hostless suite jobs, that
737 will result in new jobs that fall under category 4.
738
739 A note about the ordering of cases 3 and 4:
740 Prioritizing one case above the other leads to earlier acquisition
741 of the following resources: 1. process slots on the drone 2. machines.
742 - When a user schedules a job through the afe they choose a specific
743 host for it. Jobs with metahost can utilize any host that satisfies
744 the metahost criterion. This means that if we had scheduled 4 before
745 3 there is a good chance that a job which could've used another host,
746 will now use the host assigned to a metahost-less job. Given the
747 availability of machines in pool:suites, this almost guarantees
748 starvation for jobs scheduled through the frontend.
749 - Scheduling 4 before 3 also has its pros however, since a suite
750 has the concept of a time out, whereas users can wait. If we hit the
751 process count on the drone a suite can timeout waiting on the test,
752 but a user job generally has a much longer timeout, and relatively
753 harmless consequences.
754 The current ordering was chosed because it is more likely that we will
755 run out of machines in pool:suites than processes on the drone.
756
757 @returns A list of HQEs ordered according to sort_order.
758 """
759 sort_order = ('afe_jobs.priority DESC, '
760 'ISNULL(host_id), '
761 'ISNULL(meta_host), '
Alex Millerd3614042014-01-13 15:58:18 -0800762 'parent_job_id, '
beeps7d8a1b12013-10-29 17:58:34 -0700763 'job_id')
beeps7d8273b2013-11-06 09:44:34 -0800764 query=('NOT complete AND NOT active AND status="Queued"'
765 'AND NOT aborted')
jamesrenc44ae992010-02-19 00:12:54 +0000766 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000767 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
beeps7d8273b2013-11-06 09:44:34 -0800768 where=query, order_by=sort_order))
mbligh36768f02008-02-22 18:28:33 +0000769
770
showard89f84db2009-03-12 20:39:13 +0000771 def _refresh_pending_queue_entries(self):
772 """
773 Lookup the pending HostQueueEntries and call our HostScheduler
774 refresh() method given that list. Return the list.
775
776 @returns A list of pending HostQueueEntries sorted in priority order.
777 """
showard63a34772008-08-18 19:32:50 +0000778 queue_entries = self._get_pending_queue_entries()
779 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000780 return []
showardb95b1bd2008-08-15 18:11:04 +0000781
showard63a34772008-08-18 19:32:50 +0000782 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000783
showard89f84db2009-03-12 20:39:13 +0000784 return queue_entries
785
786
787 def _schedule_atomic_group(self, queue_entry):
788 """
789 Schedule the given queue_entry on an atomic group of hosts.
790
791 Returns immediately if there are insufficient available hosts.
792
793 Creates new HostQueueEntries based off of queue_entry for the
794 scheduled hosts and starts them all running.
795 """
796 # This is a virtual host queue entry representing an entire
797 # atomic group, find a group and schedule their hosts.
798 group_hosts = self._host_scheduler.find_eligible_atomic_group(
799 queue_entry)
800 if not group_hosts:
801 return
showardcbe6f942009-06-17 19:33:49 +0000802
803 logging.info('Expanding atomic group entry %s with hosts %s',
804 queue_entry,
805 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000806
showard89f84db2009-03-12 20:39:13 +0000807 for assigned_host in group_hosts[1:]:
808 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000809 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000810 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000811 new_hqe.set_host(assigned_host)
812 self._run_queue_entry(new_hqe)
813
814 # The first assigned host uses the original HostQueueEntry
815 queue_entry.set_host(group_hosts[0])
816 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000817
818
showarda9545c02009-12-18 22:44:26 +0000819 def _schedule_hostless_job(self, queue_entry):
820 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000821 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000822
823
showard89f84db2009-03-12 20:39:13 +0000824 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700825 """
826 Find any new HQEs and call schedule_pre_job_tasks for it.
827
828 This involves setting the status of the HQE and creating a row in the
829 db corresponding the the special task, through
830 scheduler_models._queue_special_task. The new db row is then added as
831 an agent to the dispatcher through _schedule_special_tasks and
832 scheduled for execution on the drone through _handle_agents.
833 """
showard89f84db2009-03-12 20:39:13 +0000834 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000835
beepsb255fc52013-10-13 23:28:54 -0700836 new_hostless_jobs = 0
837 new_atomic_groups = 0
838 new_jobs_with_hosts = 0
839 new_jobs_need_hosts = 0
840
Simran Basi3f6717d2012-09-13 15:21:22 -0700841 logging.debug('Processing %d queue_entries', len(queue_entries))
showard63a34772008-08-18 19:32:50 +0000842 for queue_entry in queue_entries:
Simran Basidef92872012-09-20 13:34:34 -0700843 self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
showarde55955f2009-10-07 20:48:58 +0000844 is_unassigned_atomic_group = (
845 queue_entry.atomic_group_id is not None
846 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +0000847
848 if queue_entry.is_hostless():
Simran Basidef92872012-09-20 13:34:34 -0700849 self._log_extra_msg('Scheduling hostless job.')
showarda9545c02009-12-18 22:44:26 +0000850 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700851 new_hostless_jobs = new_hostless_jobs + 1
jamesren883492a2010-02-12 00:45:18 +0000852 elif is_unassigned_atomic_group:
853 self._schedule_atomic_group(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700854 new_atmoic_groups = new_atomic_groups + 1
showarde55955f2009-10-07 20:48:58 +0000855 else:
beepsb255fc52013-10-13 23:28:54 -0700856 new_jobs_need_hosts = new_jobs_need_hosts + 1
jamesren883492a2010-02-12 00:45:18 +0000857 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
beeps02377932014-01-15 13:18:28 -0800858 if assigned_host:
859 # If we ever find ourselves in a position where a ready host
860 # has an agent, roll back the host assignment and try again
861 # next tick.
862 if self.host_has_agent(assigned_host):
863 host_agent_task = [host_agent.task for host_agent in
864 list(self._host_agents.get(
865 assigned_host.id))][0]
866 subject = 'Host with agents assigned to an HQE'
867 message = ('HQE: %s assigned host %s, but the host has '
868 'agent: %s for queue_entry %s. The HQE '
869 'will remain in a queued state till the '
870 'the host is usable.' %
871 (queue_entry, assigned_host.hostname,
872 host_agent_task,
873 host_agent_task.queue_entry))
874 email_manager.manager.enqueue_notify_email(subject, message)
875 queue_entry.set_host(None)
876 queue_entry.update_field('active', False)
877 else:
878 assert assigned_host.id == queue_entry.host_id
879 self._run_queue_entry(queue_entry)
880 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700881
882 key = 'scheduler.jobs_per_tick'
883 stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
884 stats.Gauge(key).send('new_atomic_groups', new_atomic_groups)
885 stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
886 stats.Gauge(key).send('new_jobs_without_hosts',
887 new_jobs_need_hosts - new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000888
889
showard8cc058f2009-09-08 16:26:33 +0000890 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700891 """
892 Adds agents to the dispatcher.
893
894 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
895 QueueTask for example, will have a job with a control file, and
896 the agent will have methods that poll, abort and check if the queue
897 task is finished. The dispatcher runs the agent_task, as well as
898 other agents in it's _agents member, through _handle_agents, by
899 calling the Agents tick().
900
901 This method creates an agent for each HQE in one of (starting, running,
902 gathering, parsing, archiving) states, and adds it to the dispatcher so
903 it is handled by _handle_agents.
904 """
showardd1195652009-12-08 22:21:02 +0000905 for agent_task in self._get_queue_entry_agent_tasks():
906 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000907
908
909 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000910 for entry in scheduler_models.HostQueueEntry.fetch(
911 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000912 task = entry.job.schedule_delayed_callback_task(entry)
913 if task:
showardd1195652009-12-08 22:21:02 +0000914 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000915
916
jamesren883492a2010-02-12 00:45:18 +0000917 def _run_queue_entry(self, queue_entry):
Simran Basidef92872012-09-20 13:34:34 -0700918 self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
919 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000920 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000921
922
jadmanski0afbb632008-06-06 21:10:57 +0000923 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700924 """
925 Looks through the afe_host_queue_entries for an aborted entry.
926
927 The aborted bit is set on an HQE in many ways, the most common
928 being when a user requests an abort through the frontend, which
929 results in an rpc from the afe to abort_host_queue_entries.
930 """
jamesrene7c65cb2010-06-08 20:38:10 +0000931 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000932 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700933 where='aborted=1 and complete=0'):
showardf4a2e502009-07-28 20:06:39 +0000934 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800935
936 # The task would have started off with both is_complete and
937 # is_active = False. Aborted tasks are neither active nor complete.
938 # For all currently active tasks this will happen through the agent,
939 # but we need to manually update the special tasks that haven't
940 # started yet, because they don't have agents.
941 models.SpecialTask.objects.filter(is_active=False,
942 queue_entry_id=entry.id).update(is_complete=True)
943
showardd3dc1992009-04-22 21:01:40 +0000944 for agent in self.get_agents_for_entry(entry):
945 agent.abort()
946 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000947 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700948 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000949 for job in jobs_to_stop:
950 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000951
952
beeps8bb1f7d2013-08-05 01:30:09 -0700953 def _find_aborted_special_tasks(self):
954 """
955 Find SpecialTasks that have been marked for abortion.
956
957 Poll the database looking for SpecialTasks that are active
958 and have been marked for abortion, then abort them.
959 """
960
961 # The completed and active bits are very important when it comes
962 # to scheduler correctness. The active bit is set through the prolog
963 # of a special task, and reset through the cleanup method of the
964 # SpecialAgentTask. The cleanup is called both through the abort and
965 # epilog. The complete bit is set in several places, and in general
966 # a hanging job will have is_active=1 is_complete=0, while a special
967 # task which completed will have is_active=0 is_complete=1. To check
968 # aborts we directly check active because the complete bit is set in
969 # several places, including the epilog of agent tasks.
970 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
971 is_aborted=True)
972 for task in aborted_tasks:
973 # There are 2 ways to get the agent associated with a task,
974 # through the host and through the hqe. A special task
975 # always needs a host, but doesn't always need a hqe.
976 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700977 if isinstance(agent.task, agent_task.SpecialAgentTask):
beeps8bb1f7d2013-08-05 01:30:09 -0700978
979 # The epilog preforms critical actions such as
980 # queueing the next SpecialTask, requeuing the
981 # hqe etc, however it doesn't actually kill the
982 # monitor process and set the 'done' bit. Epilogs
983 # assume that the job failed, and that the monitor
984 # process has already written an exit code. The
985 # done bit is a necessary condition for
986 # _handle_agents to schedule any more special
987 # tasks against the host, and it must be set
988 # in addition to is_active, is_complete and success.
989 agent.task.epilog()
990 agent.task.abort()
991
992
showard324bf812009-01-20 23:23:38 +0000993 def _can_start_agent(self, agent, num_started_this_cycle,
994 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000995 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000996 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000997 return True
998 # don't allow any nonzero-process agents to run after we've reached a
999 # limit (this avoids starvation of many-process agents)
1000 if have_reached_limit:
1001 return False
1002 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001003 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +00001004 agent.task.owner_username,
1005 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +00001006 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001007 return False
1008 # if a single agent exceeds the per-cycle throttling, still allow it to
1009 # run when it's the first agent in the cycle
1010 if num_started_this_cycle == 0:
1011 return True
1012 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +00001013 if (num_started_this_cycle + agent.task.num_processes >
1014 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001015 return False
1016 return True
1017
1018
jadmanski0afbb632008-06-06 21:10:57 +00001019 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -07001020 """
1021 Handles agents of the dispatcher.
1022
1023 Appropriate Agents are added to the dispatcher through
1024 _schedule_running_host_queue_entries. These agents each
1025 have a task. This method runs the agents task through
1026 agent.tick() leading to:
1027 agent.start
1028 prolog -> AgentTasks prolog
1029 For each queue entry:
1030 sets host status/status to Running
1031 set started_on in afe_host_queue_entries
1032 run -> AgentTasks run
1033 Creates PidfileRunMonitor
1034 Queues the autoserv command line for this AgentTask
1035 via the drone manager. These commands are executed
1036 through the drone managers execute actions.
1037 poll -> AgentTasks/BaseAgentTask poll
1038 checks the monitors exit_code.
1039 Executes epilog if task is finished.
1040 Executes AgentTasks _finish_task
1041 finish_task is usually responsible for setting the status
1042 of the HQE/host, and updating it's active and complete fileds.
1043
1044 agent.is_done
1045 Removed the agent from the dispatchers _agents queue.
1046 Is_done checks the finished bit on the agent, that is
1047 set based on the Agents task. During the agents poll
1048 we check to see if the monitor process has exited in
1049 it's finish method, and set the success member of the
1050 task based on this exit code.
1051 """
jadmanski0afbb632008-06-06 21:10:57 +00001052 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001053 have_reached_limit = False
1054 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -07001055 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +00001056 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001057 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1058 'queue_entry ids:%s' % (agent.host_ids,
1059 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001060 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001061 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001062 have_reached_limit):
1063 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001064 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001065 continue
showardd1195652009-12-08 22:21:02 +00001066 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001067 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001068 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001069 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001070 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -07001071 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001072 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -07001073 logging.info('%d running processes. %d added this cycle.',
1074 _drone_manager.total_running_processes(),
1075 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +00001076
1077
showard29f7cd22009-04-29 21:16:24 +00001078 def _process_recurring_runs(self):
1079 recurring_runs = models.RecurringRun.objects.filter(
1080 start_date__lte=datetime.datetime.now())
1081 for rrun in recurring_runs:
1082 # Create job from template
1083 job = rrun.job
1084 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001085 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001086
1087 host_objects = info['hosts']
1088 one_time_hosts = info['one_time_hosts']
1089 metahost_objects = info['meta_hosts']
1090 dependencies = info['dependencies']
1091 atomic_group = info['atomic_group']
1092
1093 for host in one_time_hosts or []:
1094 this_host = models.Host.create_one_time_host(host.hostname)
1095 host_objects.append(this_host)
1096
1097 try:
1098 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001099 options=options,
showard29f7cd22009-04-29 21:16:24 +00001100 host_objects=host_objects,
1101 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001102 atomic_group=atomic_group)
1103
1104 except Exception, ex:
1105 logging.exception(ex)
1106 #TODO send email
1107
1108 if rrun.loop_count == 1:
1109 rrun.delete()
1110 else:
1111 if rrun.loop_count != 0: # if not infinite loop
1112 # calculate new start_date
1113 difference = datetime.timedelta(seconds=rrun.loop_period)
1114 rrun.start_date = rrun.start_date + difference
1115 rrun.loop_count -= 1
1116 rrun.save()
1117
1118
Simran Basia858a232012-08-21 11:04:37 -07001119SiteDispatcher = utils.import_site_class(
1120 __file__, 'autotest_lib.scheduler.site_monitor_db',
1121 'SiteDispatcher', BaseDispatcher)
1122
1123class Dispatcher(SiteDispatcher):
1124 pass
1125
1126
mbligh36768f02008-02-22 18:28:33 +00001127class Agent(object):
showard77182562009-06-10 00:16:05 +00001128 """
Alex Miller47715eb2013-07-24 03:34:01 -07001129 An agent for use by the Dispatcher class to perform a task. An agent wraps
1130 around an AgentTask mainly to associate the AgentTask with the queue_entry
1131 and host ids.
showard77182562009-06-10 00:16:05 +00001132
1133 The following methods are required on all task objects:
1134 poll() - Called periodically to let the task check its status and
1135 update its internal state. If the task succeeded.
1136 is_done() - Returns True if the task is finished.
1137 abort() - Called when an abort has been requested. The task must
1138 set its aborted attribute to True if it actually aborted.
1139
1140 The following attributes are required on all task objects:
1141 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001142 success - bool, True if this task succeeded.
1143 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1144 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001145 """
1146
1147
showard418785b2009-11-23 20:19:59 +00001148 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001149 """
Alex Miller47715eb2013-07-24 03:34:01 -07001150 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001151 """
showard8cc058f2009-09-08 16:26:33 +00001152 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001153
showard77182562009-06-10 00:16:05 +00001154 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001155 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001156
showard8cc058f2009-09-08 16:26:33 +00001157 self.queue_entry_ids = task.queue_entry_ids
1158 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001159
showard8cc058f2009-09-08 16:26:33 +00001160 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001161 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001162
1163
jadmanski0afbb632008-06-06 21:10:57 +00001164 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001165 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001166 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001167 self.task.poll()
1168 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001169 self.finished = True
showardec113162008-05-08 00:52:49 +00001170
1171
jadmanski0afbb632008-06-06 21:10:57 +00001172 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001173 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001174
1175
showardd3dc1992009-04-22 21:01:40 +00001176 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001177 if self.task:
1178 self.task.abort()
1179 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001180 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001181 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001182
showardd3dc1992009-04-22 21:01:40 +00001183
beeps5e2bb4a2013-10-28 11:26:45 -07001184class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001185 """
1186 Common functionality for QueueTask and HostlessQueueTask
1187 """
1188 def __init__(self, queue_entries):
1189 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001190 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001191 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001192
1193
showard73ec0442009-02-07 02:05:20 +00001194 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001195 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001196
1197
jamesrenc44ae992010-02-19 00:12:54 +00001198 def _write_control_file(self, execution_path):
1199 control_path = _drone_manager.attach_file_to_execution(
1200 execution_path, self.job.control_file)
1201 return control_path
1202
1203
Aviv Keshet308e7362013-05-21 14:43:16 -07001204 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001205 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001206 execution_path = self.queue_entries[0].execution_path()
1207 control_path = self._write_control_file(execution_path)
1208 hostnames = ','.join(entry.host.hostname
1209 for entry in self.queue_entries
1210 if not entry.is_hostless())
1211
1212 execution_tag = self.queue_entries[0].execution_tag()
1213 params = _autoserv_command_line(
1214 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001215 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001216 _drone_manager.absolute_path(control_path)],
1217 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001218 if self.job.is_image_update_job():
1219 params += ['--image', self.job.update_image_path]
1220
jamesrenc44ae992010-02-19 00:12:54 +00001221 return params
showardd1195652009-12-08 22:21:02 +00001222
1223
1224 @property
1225 def num_processes(self):
1226 return len(self.queue_entries)
1227
1228
1229 @property
1230 def owner_username(self):
1231 return self.job.owner
1232
1233
1234 def _working_directory(self):
1235 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001236
1237
jadmanski0afbb632008-06-06 21:10:57 +00001238 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001239 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001240 keyval_dict = self.job.keyval_dict()
1241 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001242 group_name = self.queue_entries[0].get_group_name()
1243 if group_name:
1244 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001245 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001246 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001247 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001248 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001249
1250
showard35162b02009-03-03 02:17:30 +00001251 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001252 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001253 _drone_manager.write_lines_to_file(error_file_path,
1254 [_LOST_PROCESS_ERROR])
1255
1256
showardd3dc1992009-04-22 21:01:40 +00001257 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001258 if not self.monitor:
1259 return
1260
showardd9205182009-04-27 20:09:55 +00001261 self._write_job_finished()
1262
showard35162b02009-03-03 02:17:30 +00001263 if self.monitor.lost_process:
1264 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001265
jadmanskif7fa2cc2008-10-01 14:13:23 +00001266
showardcbd74612008-11-19 21:42:02 +00001267 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001268 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001269 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001270 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001271 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001272
1273
jadmanskif7fa2cc2008-10-01 14:13:23 +00001274 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001275 if not self.monitor or not self.monitor.has_process():
1276 return
1277
jadmanskif7fa2cc2008-10-01 14:13:23 +00001278 # build up sets of all the aborted_by and aborted_on values
1279 aborted_by, aborted_on = set(), set()
1280 for queue_entry in self.queue_entries:
1281 if queue_entry.aborted_by:
1282 aborted_by.add(queue_entry.aborted_by)
1283 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1284 aborted_on.add(t)
1285
1286 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001287 # TODO(showard): this conditional is now obsolete, we just need to leave
1288 # it in temporarily for backwards compatibility over upgrades. delete
1289 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001290 assert len(aborted_by) <= 1
1291 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001292 aborted_by_value = aborted_by.pop()
1293 aborted_on_value = max(aborted_on)
1294 else:
1295 aborted_by_value = 'autotest_system'
1296 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001297
showarda0382352009-02-11 23:36:43 +00001298 self._write_keyval_after_job("aborted_by", aborted_by_value)
1299 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001300
showardcbd74612008-11-19 21:42:02 +00001301 aborted_on_string = str(datetime.datetime.fromtimestamp(
1302 aborted_on_value))
1303 self._write_status_comment('Job aborted by %s on %s' %
1304 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001305
1306
jadmanski0afbb632008-06-06 21:10:57 +00001307 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001308 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001309 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001310 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001311
1312
jadmanski0afbb632008-06-06 21:10:57 +00001313 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001314 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001315 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001316
1317
1318class QueueTask(AbstractQueueTask):
1319 def __init__(self, queue_entries):
1320 super(QueueTask, self).__init__(queue_entries)
1321 self._set_ids(queue_entries=queue_entries)
1322
1323
1324 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001325 self._check_queue_entry_statuses(
1326 self.queue_entries,
1327 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1328 models.HostQueueEntry.Status.RUNNING),
1329 allowed_host_statuses=(models.Host.Status.PENDING,
1330 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001331
1332 super(QueueTask, self).prolog()
1333
1334 for queue_entry in self.queue_entries:
1335 self._write_host_keyvals(queue_entry.host)
1336 queue_entry.host.set_status(models.Host.Status.RUNNING)
1337 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001338
1339
1340 def _finish_task(self):
1341 super(QueueTask, self)._finish_task()
1342
1343 for queue_entry in self.queue_entries:
1344 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001345 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001346
1347
Alex Miller9f01d5d2013-08-08 02:26:01 -07001348 def _command_line(self):
1349 invocation = super(QueueTask, self)._command_line()
1350 return invocation + ['--verify_job_repo_url']
1351
1352
Dan Shi1a189052013-10-28 14:41:35 -07001353class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001354 def __init__(self, queue_entry):
1355 super(HostlessQueueTask, self).__init__([queue_entry])
1356 self.queue_entry_ids = [queue_entry.id]
1357
1358
1359 def prolog(self):
1360 self.queue_entries[0].update_field('execution_subdir', 'hostless')
1361 super(HostlessQueueTask, self).prolog()
1362
1363
mbligh4608b002010-01-05 18:22:35 +00001364 def _finish_task(self):
1365 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001366
1367 # When a job is added to database, its initial status is always
1368 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1369 # status, check if any of them can be started. If scheduler hits some
1370 # limit, e.g., max_hostless_jobs_per_drone, max_jobs_started_per_cycle,
1371 # scheduler will leave these jobs in Starting status. Otherwise, the
1372 # jobs' status will be changed to Running, and an autoserv process will
1373 # be started in drone for each of these jobs.
1374 # If the entry is still in status Starting, the process has not started
1375 # yet. Therefore, there is no need to parse and collect log. Without
1376 # this check, exception will be raised by scheduler as execution_subdir
1377 # for this queue entry does not have a value yet.
1378 hqe = self.queue_entries[0]
1379 if hqe.status != models.HostQueueEntry.Status.STARTING:
1380 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001381
1382
mbligh36768f02008-02-22 18:28:33 +00001383if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001384 main()