blob: 31efe83dcc01aff068f49a75c1a04ffb36949573 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
Aviv Keshet225bdfe2013-03-05 10:10:08 -08008import datetime, optparse, os, signal
beeps5e2bb4a2013-10-28 11:26:45 -07009import sys, time
Aviv Keshet225bdfe2013-03-05 10:10:08 -080010import logging, gc
showard402934a2009-12-21 22:20:47 +000011
Alex Miller05d7b4c2013-03-04 07:49:38 -080012import common
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000014
15import django.db
16
Prashanth B0e960282014-05-13 19:38:28 -070017from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070018from autotest_lib.client.common_lib import utils
Prashanth B0e960282014-05-13 19:38:28 -070019from autotest_lib.frontend.afe import models, rpc_utils
beeps5e2bb4a2013-10-28 11:26:45 -070020from autotest_lib.scheduler import agent_task, drone_manager, drones
21from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
22from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070023from autotest_lib.scheduler import postjob_task
beepscc9fc702013-12-02 12:45:38 -080024from autotest_lib.scheduler import rdb_lib
25from autotest_lib.scheduler import rdb_utils
Prashanth B0e960282014-05-13 19:38:28 -070026from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000027from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080028from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070029from autotest_lib.server import autoserv_utils
Fang Deng1d6c2a02013-04-17 15:25:45 -070030from autotest_lib.site_utils.graphite import stats
Alex Miller05d7b4c2013-03-04 07:49:38 -080031
showard549afad2009-08-20 23:33:36 +000032BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
33PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000034
mbligh36768f02008-02-22 18:28:33 +000035RESULTS_DIR = '.'
showard170873e2009-01-07 00:22:26 +000036DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000037AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
38
39if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000040 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000041AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
42AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
43
44if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000045 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000046
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
Prashanth B0e960282014-05-13 19:38:28 -070054_db_manager = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070056
57# These 2 globals are replaced for testing
58_autoserv_directory = autoserv_utils.autoserv_directory
59_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000060_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000061_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000062
Eric Lie0493a42010-11-15 13:05:43 -080063def _parser_path_default(install_dir):
64 return os.path.join(install_dir, 'tko', 'parse')
65_parser_path_func = utils.import_site_function(
66 __file__, 'autotest_lib.scheduler.site_monitor_db',
67 'parser_path', _parser_path_default)
68_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
69
mbligh36768f02008-02-22 18:28:33 +000070
mbligh83c1e9e2009-05-01 23:10:41 +000071def _site_init_monitor_db_dummy():
72 return {}
73
74
jamesren76fcf192010-04-21 20:39:50 +000075def _verify_default_drone_set_exists():
76 if (models.DroneSet.drone_sets_enabled() and
77 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070078 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080079 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000080
81
82def _sanity_check():
83 """Make sure the configs are consistent before starting the scheduler"""
84 _verify_default_drone_set_exists()
85
86
mbligh36768f02008-02-22 18:28:33 +000087def main():
showard27f33872009-04-07 18:20:53 +000088 try:
showard549afad2009-08-20 23:33:36 +000089 try:
90 main_without_exception_handling()
91 except SystemExit:
92 raise
93 except:
94 logging.exception('Exception escaping in monitor_db')
95 raise
96 finally:
97 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000098
99
100def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700101 scheduler_lib.setup_logging(
102 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
103 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000104 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000105 parser = optparse.OptionParser(usage)
106 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
107 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000108 parser.add_option('--test', help='Indicate that scheduler is under ' +
109 'test and should use dummy autoserv and no parsing',
110 action='store_true')
111 (options, args) = parser.parse_args()
112 if len(args) != 1:
113 parser.print_usage()
114 return
mbligh36768f02008-02-22 18:28:33 +0000115
showard5613c662009-06-08 23:30:33 +0000116 scheduler_enabled = global_config.global_config.get_config_value(
117 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
118
119 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800120 logging.error("Scheduler not enabled, set enable_scheduler to true in "
121 "the global_config's SCHEDULER section to enable it. "
122 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000123 sys.exit(1)
124
jadmanski0afbb632008-06-06 21:10:57 +0000125 global RESULTS_DIR
126 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000127
mbligh83c1e9e2009-05-01 23:10:41 +0000128 site_init = utils.import_site_function(__file__,
129 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
130 _site_init_monitor_db_dummy)
131 site_init()
132
showardcca334f2009-03-12 20:38:34 +0000133 # Change the cwd while running to avoid issues incase we were launched from
134 # somewhere odd (such as a random NFS home directory of the person running
135 # sudo to launch us as the appropriate user).
136 os.chdir(RESULTS_DIR)
137
jamesrenc7d387e2010-08-10 21:48:30 +0000138 # This is helpful for debugging why stuff a scheduler launches is
139 # misbehaving.
140 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000141
jadmanski0afbb632008-06-06 21:10:57 +0000142 if options.test:
143 global _autoserv_path
144 _autoserv_path = 'autoserv_dummy'
145 global _testing_mode
146 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000147
jamesrenc44ae992010-02-19 00:12:54 +0000148 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000149 server.start()
150
jadmanski0afbb632008-06-06 21:10:57 +0000151 try:
jamesrenc44ae992010-02-19 00:12:54 +0000152 initialize()
showardc5afc462009-01-13 00:09:39 +0000153 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000154 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000155
Eric Lia82dc352011-02-23 13:15:52 -0800156 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000157 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000158 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000159 except:
showard170873e2009-01-07 00:22:26 +0000160 email_manager.manager.log_stacktrace(
161 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000162
showard170873e2009-01-07 00:22:26 +0000163 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000164 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000165 _drone_manager.shutdown()
Prashanth B0e960282014-05-13 19:38:28 -0700166 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000167
168
mbligh36768f02008-02-22 18:28:33 +0000169def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000170 global _shutdown
171 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000172 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000173
174
jamesrenc44ae992010-02-19 00:12:54 +0000175def initialize():
showardb18134f2009-03-20 20:52:18 +0000176 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
177 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000178
showard8de37132009-08-31 18:33:08 +0000179 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000180 logging.critical("monitor_db already running, aborting!")
181 sys.exit(1)
182 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000183
showardb1e51872008-10-07 11:08:18 +0000184 if _testing_mode:
185 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000186 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000187
jadmanski0afbb632008-06-06 21:10:57 +0000188 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700189 global _db_manager
190 _db_manager = scheduler_lib.ConnectionManager()
showardfa8629c2008-11-04 16:51:23 +0000191
showardb18134f2009-03-20 20:52:18 +0000192 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000193 signal.signal(signal.SIGINT, handle_sigint)
194
jamesrenc44ae992010-02-19 00:12:54 +0000195 initialize_globals()
196 scheduler_models.initialize()
197
showardd1ee1dd2009-01-07 21:33:08 +0000198 drones = global_config.global_config.get_config_value(
199 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
200 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000201 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000202 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000203 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
204
showardb18134f2009-03-20 20:52:18 +0000205 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000206
207
jamesrenc44ae992010-02-19 00:12:54 +0000208def initialize_globals():
209 global _drone_manager
210 _drone_manager = drone_manager.instance()
211
212
showarded2afea2009-07-07 20:54:07 +0000213def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
214 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000215 """
216 @returns The autoserv command line as a list of executable + parameters.
217
218 @param machines - string - A machine or comma separated list of machines
219 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000220 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700221 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
222 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000223 @param queue_entry - A HostQueueEntry object - If supplied and no Job
224 object was supplied, this will be used to lookup the Job object.
225 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700226 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
227 machines, results_directory=drone_manager.WORKING_DIRECTORY,
228 extra_args=extra_args, job=job, queue_entry=queue_entry,
229 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000230
231
Simran Basia858a232012-08-21 11:04:37 -0700232class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800233
234
jadmanski0afbb632008-06-06 21:10:57 +0000235 def __init__(self):
236 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000237 self._last_clean_time = time.time()
Prashanth B0e960282014-05-13 19:38:28 -0700238 self._host_scheduler = host_scheduler.HostScheduler(
239 _db_manager.get_connection())
mblighf3294cc2009-04-08 21:17:38 +0000240 user_cleanup_time = scheduler_config.config.clean_interval
241 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Prashanth B0e960282014-05-13 19:38:28 -0700242 _db_manager.get_connection(), user_cleanup_time)
243 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
244 _db_manager.get_connection())
showard170873e2009-01-07 00:22:26 +0000245 self._host_agents = {}
246 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000247 self._tick_count = 0
248 self._last_garbage_stats_time = time.time()
249 self._seconds_between_garbage_stats = 60 * (
250 global_config.global_config.get_config_value(
251 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700252 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700253 self._tick_debug = global_config.global_config.get_config_value(
254 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
255 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700256 self._extra_debugging = global_config.global_config.get_config_value(
257 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
258 default=False)
mbligh36768f02008-02-22 18:28:33 +0000259
mbligh36768f02008-02-22 18:28:33 +0000260
showard915958d2009-04-22 21:00:58 +0000261 def initialize(self, recover_hosts=True):
262 self._periodic_cleanup.initialize()
263 self._24hr_upkeep.initialize()
264
jadmanski0afbb632008-06-06 21:10:57 +0000265 # always recover processes
266 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000267
jadmanski0afbb632008-06-06 21:10:57 +0000268 if recover_hosts:
269 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000270
jamesrenc44ae992010-02-19 00:12:54 +0000271 self._host_scheduler.recovery_on_startup()
272
mbligh36768f02008-02-22 18:28:33 +0000273
Simran Basi0ec94dd2012-08-28 09:50:10 -0700274 def _log_tick_msg(self, msg):
275 if self._tick_debug:
276 logging.debug(msg)
277
278
Simran Basidef92872012-09-20 13:34:34 -0700279 def _log_extra_msg(self, msg):
280 if self._extra_debugging:
281 logging.debug(msg)
282
283
jadmanski0afbb632008-06-06 21:10:57 +0000284 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700285 """
286 This is an altered version of tick() where we keep track of when each
287 major step begins so we can try to figure out where we are using most
288 of the tick time.
289 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700290 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700291 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000292 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700293 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000294 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700295 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000296 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700297 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000298 self._find_aborting()
beeps8bb1f7d2013-08-05 01:30:09 -0700299 self._log_tick_msg('Calling _find_aborted_special_tasks().')
300 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700301 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000302 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700303 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000304 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700305 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000306 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700307 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000308 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700309 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000310 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700311 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000312 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700313 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000314 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700315 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000316 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700317 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700318 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700319 with timer.get_client('email_manager_send_queued_emails'):
320 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700321 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700322 with timer.get_client('django_db_reset_queries'):
323 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000324 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000325
showard97aed502008-11-04 02:01:24 +0000326
mblighf3294cc2009-04-08 21:17:38 +0000327 def _run_cleanup(self):
328 self._periodic_cleanup.run_cleanup_maybe()
329 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000330
mbligh36768f02008-02-22 18:28:33 +0000331
showardf13a9e22009-12-18 22:54:09 +0000332 def _garbage_collection(self):
333 threshold_time = time.time() - self._seconds_between_garbage_stats
334 if threshold_time < self._last_garbage_stats_time:
335 # Don't generate these reports very often.
336 return
337
338 self._last_garbage_stats_time = time.time()
339 # Force a full level 0 collection (because we can, it doesn't hurt
340 # at this interval).
341 gc.collect()
342 logging.info('Logging garbage collector stats on tick %d.',
343 self._tick_count)
344 gc_stats._log_garbage_collector_stats()
345
346
showard170873e2009-01-07 00:22:26 +0000347 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
348 for object_id in object_ids:
349 agent_dict.setdefault(object_id, set()).add(agent)
350
351
352 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
353 for object_id in object_ids:
354 assert object_id in agent_dict
355 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700356 # If an ID has no more active agent associated, there is no need to
357 # keep it in the dictionary. Otherwise, scheduler will keep an
358 # unnecessarily big dictionary until being restarted.
359 if not agent_dict[object_id]:
360 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000361
362
showardd1195652009-12-08 22:21:02 +0000363 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700364 """
365 Creates and adds an agent to the dispatchers list.
366
367 In creating the agent we also pass on all the queue_entry_ids and
368 host_ids from the special agent task. For every agent we create, we
369 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
370 against the host_ids given to it. So theoritically, a host can have any
371 number of agents associated with it, and each of them can have any
372 special agent task, though in practice we never see > 1 agent/task per
373 host at any time.
374
375 @param agent_task: A SpecialTask for the agent to manage.
376 """
showardd1195652009-12-08 22:21:02 +0000377 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000378 self._agents.append(agent)
379 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000380 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
381 self._register_agent_for_ids(self._queue_entry_agents,
382 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000383
showard170873e2009-01-07 00:22:26 +0000384
385 def get_agents_for_entry(self, queue_entry):
386 """
387 Find agents corresponding to the specified queue_entry.
388 """
showardd3dc1992009-04-22 21:01:40 +0000389 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000390
391
392 def host_has_agent(self, host):
393 """
394 Determine if there is currently an Agent present using this host.
395 """
396 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000397
398
jadmanski0afbb632008-06-06 21:10:57 +0000399 def remove_agent(self, agent):
400 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000401 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
402 agent)
403 self._unregister_agent_for_ids(self._queue_entry_agents,
404 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000405
406
showard8cc058f2009-09-08 16:26:33 +0000407 def _host_has_scheduled_special_task(self, host):
408 return bool(models.SpecialTask.objects.filter(host__id=host.id,
409 is_active=False,
410 is_complete=False))
411
412
jadmanski0afbb632008-06-06 21:10:57 +0000413 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000414 agent_tasks = self._create_recovery_agent_tasks()
415 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000416 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000417 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000418 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000419 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000420 self._reverify_remaining_hosts()
421 # reinitialize drones after killing orphaned processes, since they can
422 # leave around files when they die
423 _drone_manager.execute_actions()
424 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000425
showard170873e2009-01-07 00:22:26 +0000426
showardd1195652009-12-08 22:21:02 +0000427 def _create_recovery_agent_tasks(self):
428 return (self._get_queue_entry_agent_tasks()
429 + self._get_special_task_agent_tasks(is_active=True))
430
431
432 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700433 """
434 Get agent tasks for all hqe in the specified states.
435
436 Loosely this translates to taking a hqe in one of the specified states,
437 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
438 through _get_agent_task_for_queue_entry. Each queue entry can only have
439 one agent task at a time, but there might be multiple queue entries in
440 the group.
441
442 @return: A list of AgentTasks.
443 """
showardd1195652009-12-08 22:21:02 +0000444 # host queue entry statuses handled directly by AgentTasks (Verifying is
445 # handled through SpecialTasks, so is not listed here)
446 statuses = (models.HostQueueEntry.Status.STARTING,
447 models.HostQueueEntry.Status.RUNNING,
448 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000449 models.HostQueueEntry.Status.PARSING,
450 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000451 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000452 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000453 where='status IN (%s)' % status_list)
Alex Miller47cd2472013-11-25 15:20:04 -0800454 stats.Gauge('scheduler.jobs_per_tick').send(
455 'running', len(queue_entries))
showardd1195652009-12-08 22:21:02 +0000456
457 agent_tasks = []
458 used_queue_entries = set()
459 for entry in queue_entries:
460 if self.get_agents_for_entry(entry):
461 # already being handled
462 continue
463 if entry in used_queue_entries:
464 # already picked up by a synchronous job
465 continue
466 agent_task = self._get_agent_task_for_queue_entry(entry)
467 agent_tasks.append(agent_task)
468 used_queue_entries.update(agent_task.queue_entries)
469 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000470
471
showardd1195652009-12-08 22:21:02 +0000472 def _get_special_task_agent_tasks(self, is_active=False):
473 special_tasks = models.SpecialTask.objects.filter(
474 is_active=is_active, is_complete=False)
475 return [self._get_agent_task_for_special_task(task)
476 for task in special_tasks]
477
478
479 def _get_agent_task_for_queue_entry(self, queue_entry):
480 """
beeps8bb1f7d2013-08-05 01:30:09 -0700481 Construct an AgentTask instance for the given active HostQueueEntry.
482
showardd1195652009-12-08 22:21:02 +0000483 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700484 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000485 """
486 task_entries = queue_entry.job.get_group_entries(queue_entry)
487 self._check_for_duplicate_host_entries(task_entries)
488
489 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
490 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000491 if queue_entry.is_hostless():
492 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000493 return QueueTask(queue_entries=task_entries)
494 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700495 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000496 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700497 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000498 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700499 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000500
Prashanth B0e960282014-05-13 19:38:28 -0700501 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800502 '_get_agent_task_for_queue_entry got entry with '
503 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000504
505
506 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000507 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
508 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000509 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000510 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000511 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000512 if using_host:
showardd1195652009-12-08 22:21:02 +0000513 self._assert_host_has_no_agent(task_entry)
514
515
516 def _assert_host_has_no_agent(self, entry):
517 """
518 @param entry: a HostQueueEntry or a SpecialTask
519 """
520 if self.host_has_agent(entry.host):
521 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700522 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000523 'While scheduling %s, host %s already has a host agent %s'
524 % (entry, entry.host, agent.task))
525
526
527 def _get_agent_task_for_special_task(self, special_task):
528 """
529 Construct an AgentTask class to run the given SpecialTask and add it
530 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700531
532 A special task is create through schedule_special_tasks, but only if
533 the host doesn't already have an agent. This happens through
534 add_agent_task. All special agent tasks are given a host on creation,
535 and a Null hqe. To create a SpecialAgentTask object, you need a
536 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
537 object contains a hqe it's passed on to the special agent task, which
538 creates a HostQueueEntry and saves it as it's queue_entry.
539
showardd1195652009-12-08 22:21:02 +0000540 @param special_task: a models.SpecialTask instance
541 @returns an AgentTask to run this SpecialTask
542 """
543 self._assert_host_has_no_agent(special_task)
544
beeps5e2bb4a2013-10-28 11:26:45 -0700545 special_agent_task_classes = (prejob_task.CleanupTask,
546 prejob_task.VerifyTask,
547 prejob_task.RepairTask,
548 prejob_task.ResetTask,
549 prejob_task.ProvisionTask)
550
showardd1195652009-12-08 22:21:02 +0000551 for agent_task_class in special_agent_task_classes:
552 if agent_task_class.TASK_TYPE == special_task.task:
553 return agent_task_class(task=special_task)
554
Prashanth B0e960282014-05-13 19:38:28 -0700555 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800556 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000557
558
559 def _register_pidfiles(self, agent_tasks):
560 for agent_task in agent_tasks:
561 agent_task.register_necessary_pidfiles()
562
563
564 def _recover_tasks(self, agent_tasks):
565 orphans = _drone_manager.get_orphaned_autoserv_processes()
566
567 for agent_task in agent_tasks:
568 agent_task.recover()
569 if agent_task.monitor and agent_task.monitor.has_process():
570 orphans.discard(agent_task.monitor.get_process())
571 self.add_agent_task(agent_task)
572
573 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000574
575
showard8cc058f2009-09-08 16:26:33 +0000576 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000577 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
578 % status):
showard0db3d432009-10-12 20:29:15 +0000579 if entry.status == status and not self.get_agents_for_entry(entry):
580 # The status can change during iteration, e.g., if job.run()
581 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000582 yield entry
583
584
showard6878e8b2009-07-20 22:37:45 +0000585 def _check_for_remaining_orphan_processes(self, orphans):
586 if not orphans:
587 return
588 subject = 'Unrecovered orphan autoserv processes remain'
589 message = '\n'.join(str(process) for process in orphans)
590 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000591
592 die_on_orphans = global_config.global_config.get_config_value(
593 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
594
595 if die_on_orphans:
596 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000597
showard170873e2009-01-07 00:22:26 +0000598
showard8cc058f2009-09-08 16:26:33 +0000599 def _recover_pending_entries(self):
600 for entry in self._get_unassigned_entries(
601 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000602 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000603 entry.on_pending()
604
605
showardb8900452009-10-12 20:31:01 +0000606 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000607 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000608 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
609 unrecovered_hqes = []
610 for queue_entry in queue_entries:
611 special_tasks = models.SpecialTask.objects.filter(
612 task__in=(models.SpecialTask.Task.CLEANUP,
613 models.SpecialTask.Task.VERIFY),
614 queue_entry__id=queue_entry.id,
615 is_complete=False)
616 if special_tasks.count() == 0:
617 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000618
showardb8900452009-10-12 20:31:01 +0000619 if unrecovered_hqes:
620 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700621 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000622 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000623 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000624
625
showard65db3932009-10-28 19:54:35 +0000626 def _get_prioritized_special_tasks(self):
627 """
628 Returns all queued SpecialTasks prioritized for repair first, then
629 cleanup, then verify.
beeps8bb1f7d2013-08-05 01:30:09 -0700630
631 @return: list of afe.models.SpecialTasks sorted according to priority.
showard65db3932009-10-28 19:54:35 +0000632 """
633 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
634 is_complete=False,
635 host__locked=False)
636 # exclude hosts with active queue entries unless the SpecialTask is for
637 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000638 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000639 queued_tasks, 'afe_host_queue_entries', 'host_id',
640 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000641 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000642 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000643 where=['(afe_host_queue_entries.id IS NULL OR '
644 'afe_host_queue_entries.id = '
645 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000646
showard65db3932009-10-28 19:54:35 +0000647 # reorder tasks by priority
648 task_priority_order = [models.SpecialTask.Task.REPAIR,
649 models.SpecialTask.Task.CLEANUP,
Dan Shi07e09af2013-04-12 09:31:29 -0700650 models.SpecialTask.Task.VERIFY,
Alex Millerdfff2fd2013-05-28 13:05:06 -0700651 models.SpecialTask.Task.RESET,
652 models.SpecialTask.Task.PROVISION]
showard65db3932009-10-28 19:54:35 +0000653 def task_priority_key(task):
654 return task_priority_order.index(task.task)
655 return sorted(queued_tasks, key=task_priority_key)
656
657
showard65db3932009-10-28 19:54:35 +0000658 def _schedule_special_tasks(self):
659 """
660 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700661
662 Special tasks include PreJobTasks like verify, reset and cleanup.
663 They are created through _schedule_new_jobs and associated with a hqe
664 This method translates SpecialTasks to the appropriate AgentTask and
665 adds them to the dispatchers agents list, so _handle_agents can execute
666 them.
showard65db3932009-10-28 19:54:35 +0000667 """
668 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000669 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000670 continue
showardd1195652009-12-08 22:21:02 +0000671 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000672
673
showard170873e2009-01-07 00:22:26 +0000674 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000675 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000676 # should never happen
showarded2afea2009-07-07 20:54:07 +0000677 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000678 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000679 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700680 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000681 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000682
683
jadmanski0afbb632008-06-06 21:10:57 +0000684 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000685 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700686 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000687 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000688 if self.host_has_agent(host):
689 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000690 continue
showard8cc058f2009-09-08 16:26:33 +0000691 if self._host_has_scheduled_special_task(host):
692 # host will have a special task scheduled on the next cycle
693 continue
showard170873e2009-01-07 00:22:26 +0000694 if print_message:
showardb18134f2009-03-20 20:52:18 +0000695 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000696 models.SpecialTask.objects.create(
697 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000698 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000699
700
jadmanski0afbb632008-06-06 21:10:57 +0000701 def _recover_hosts(self):
702 # recover "Repair Failed" hosts
703 message = 'Reverifying dead host %s'
704 self._reverify_hosts_where("status = 'Repair Failed'",
705 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000706
707
showard04c82c52008-05-29 19:38:12 +0000708
showardb95b1bd2008-08-15 18:11:04 +0000709 def _get_pending_queue_entries(self):
beeps7d8a1b12013-10-29 17:58:34 -0700710 """
711 Fetch a list of new host queue entries.
712
713 The ordering of this list is important, as every new agent
714 we schedule can potentially contribute to the process count
715 on the drone, which has a static limit. The sort order
716 prioritizes jobs as follows:
717 1. High priority jobs: Based on the afe_job's priority
718 2. With hosts and metahosts: This will only happen if we don't
719 activate the hqe after assigning a host to it in
720 schedule_new_jobs.
721 3. With hosts but without metahosts: When tests are scheduled
722 through the frontend the owner of the job would have chosen
723 a host for it.
724 4. Without hosts but with metahosts: This is the common case of
725 a new test that needs a DUT. We assign a host and set it to
726 active so it shouldn't show up in case 2 on the next tick.
727 5. Without hosts and without metahosts: Hostless suite jobs, that
728 will result in new jobs that fall under category 4.
729
730 A note about the ordering of cases 3 and 4:
731 Prioritizing one case above the other leads to earlier acquisition
732 of the following resources: 1. process slots on the drone 2. machines.
733 - When a user schedules a job through the afe they choose a specific
734 host for it. Jobs with metahost can utilize any host that satisfies
735 the metahost criterion. This means that if we had scheduled 4 before
736 3 there is a good chance that a job which could've used another host,
737 will now use the host assigned to a metahost-less job. Given the
738 availability of machines in pool:suites, this almost guarantees
739 starvation for jobs scheduled through the frontend.
740 - Scheduling 4 before 3 also has its pros however, since a suite
741 has the concept of a time out, whereas users can wait. If we hit the
742 process count on the drone a suite can timeout waiting on the test,
743 but a user job generally has a much longer timeout, and relatively
744 harmless consequences.
745 The current ordering was chosed because it is more likely that we will
746 run out of machines in pool:suites than processes on the drone.
747
748 @returns A list of HQEs ordered according to sort_order.
749 """
750 sort_order = ('afe_jobs.priority DESC, '
751 'ISNULL(host_id), '
752 'ISNULL(meta_host), '
Alex Millerd3614042014-01-13 15:58:18 -0800753 'parent_job_id, '
beeps7d8a1b12013-10-29 17:58:34 -0700754 'job_id')
beeps7d8273b2013-11-06 09:44:34 -0800755 query=('NOT complete AND NOT active AND status="Queued"'
756 'AND NOT aborted')
jamesrenc44ae992010-02-19 00:12:54 +0000757 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000758 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
beeps7d8273b2013-11-06 09:44:34 -0800759 where=query, order_by=sort_order))
mbligh36768f02008-02-22 18:28:33 +0000760
761
showard89f84db2009-03-12 20:39:13 +0000762 def _refresh_pending_queue_entries(self):
763 """
764 Lookup the pending HostQueueEntries and call our HostScheduler
765 refresh() method given that list. Return the list.
766
767 @returns A list of pending HostQueueEntries sorted in priority order.
768 """
showard63a34772008-08-18 19:32:50 +0000769 queue_entries = self._get_pending_queue_entries()
770 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000771 return []
showardb95b1bd2008-08-15 18:11:04 +0000772
showard63a34772008-08-18 19:32:50 +0000773 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000774
showard89f84db2009-03-12 20:39:13 +0000775 return queue_entries
776
777
778 def _schedule_atomic_group(self, queue_entry):
779 """
780 Schedule the given queue_entry on an atomic group of hosts.
781
782 Returns immediately if there are insufficient available hosts.
783
784 Creates new HostQueueEntries based off of queue_entry for the
785 scheduled hosts and starts them all running.
786 """
787 # This is a virtual host queue entry representing an entire
788 # atomic group, find a group and schedule their hosts.
789 group_hosts = self._host_scheduler.find_eligible_atomic_group(
790 queue_entry)
791 if not group_hosts:
792 return
showardcbe6f942009-06-17 19:33:49 +0000793
794 logging.info('Expanding atomic group entry %s with hosts %s',
795 queue_entry,
796 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000797
showard89f84db2009-03-12 20:39:13 +0000798 for assigned_host in group_hosts[1:]:
799 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000800 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000801 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000802 new_hqe.set_host(assigned_host)
803 self._run_queue_entry(new_hqe)
804
805 # The first assigned host uses the original HostQueueEntry
806 queue_entry.set_host(group_hosts[0])
807 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000808
809
showarda9545c02009-12-18 22:44:26 +0000810 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800811 """Schedule a hostless (suite) job.
812
813 @param queue_entry: The queue_entry representing the hostless job.
814 """
showarda9545c02009-12-18 22:44:26 +0000815 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000816 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000817
818
beepscc9fc702013-12-02 12:45:38 -0800819 def _schedule_host_job(self, host, queue_entry):
820 """Schedules a job on the given host.
821
822 1. Assign the host to the hqe, if it isn't already assigned.
823 2. Create a SpecialAgentTask for the hqe.
824 3. Activate the hqe.
825
826 @param queue_entry: The job to schedule.
827 @param host: The host to schedule the job on.
828 """
829 if self.host_has_agent(host):
830 host_agent_task = list(self._host_agents.get(host.id))[0].task
831 subject = 'Host with agents assigned to an HQE'
832 message = ('HQE: %s assigned host %s, but the host has '
833 'agent: %s for queue_entry %s. The HQE '
beepsf7d35162014-02-13 16:42:13 -0800834 'will have to try and acquire a host next tick ' %
beepscc9fc702013-12-02 12:45:38 -0800835 (queue_entry, host.hostname, host_agent_task,
836 host_agent_task.queue_entry))
837 email_manager.manager.enqueue_notify_email(subject, message)
beepscc9fc702013-12-02 12:45:38 -0800838 else:
839 if queue_entry.host_id is None:
840 queue_entry.set_host(host)
841 else:
842 if host.id != queue_entry.host_id:
843 raise rdb_utils.RDBException('The rdb returned host: %s '
844 'but the job:%s was already assigned a host: %s. ' %
845 (host.hostname, queue_entry.job_id,
846 queue_entry.host.hostname))
847 queue_entry.update_field('active', True)
848 self._run_queue_entry(queue_entry)
849
850
showard89f84db2009-03-12 20:39:13 +0000851 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700852 """
853 Find any new HQEs and call schedule_pre_job_tasks for it.
854
855 This involves setting the status of the HQE and creating a row in the
856 db corresponding the the special task, through
857 scheduler_models._queue_special_task. The new db row is then added as
858 an agent to the dispatcher through _schedule_special_tasks and
859 scheduled for execution on the drone through _handle_agents.
860 """
showard89f84db2009-03-12 20:39:13 +0000861 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000862
beepscc9fc702013-12-02 12:45:38 -0800863 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700864 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700865 new_jobs_with_hosts = 0
866 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800867 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700868 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000869
beepscc9fc702013-12-02 12:45:38 -0800870 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000871 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000872 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700873 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000874 else:
beepscc9fc702013-12-02 12:45:38 -0800875 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700876 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700877
beepsb255fc52013-10-13 23:28:54 -0700878 stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800879 if not host_jobs:
880 return
881
882 hosts = rdb_lib.acquire_hosts(self._host_scheduler, host_jobs)
883 for host, queue_entry in zip(hosts, host_jobs):
884 if host:
885 self._schedule_host_job(host, queue_entry)
886 new_jobs_with_hosts = new_jobs_with_hosts + 1
887
beepsb255fc52013-10-13 23:28:54 -0700888 stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
889 stats.Gauge(key).send('new_jobs_without_hosts',
890 new_jobs_need_hosts - new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000891
892
showard8cc058f2009-09-08 16:26:33 +0000893 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700894 """
895 Adds agents to the dispatcher.
896
897 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
898 QueueTask for example, will have a job with a control file, and
899 the agent will have methods that poll, abort and check if the queue
900 task is finished. The dispatcher runs the agent_task, as well as
901 other agents in it's _agents member, through _handle_agents, by
902 calling the Agents tick().
903
904 This method creates an agent for each HQE in one of (starting, running,
905 gathering, parsing, archiving) states, and adds it to the dispatcher so
906 it is handled by _handle_agents.
907 """
showardd1195652009-12-08 22:21:02 +0000908 for agent_task in self._get_queue_entry_agent_tasks():
909 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000910
911
912 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000913 for entry in scheduler_models.HostQueueEntry.fetch(
914 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000915 task = entry.job.schedule_delayed_callback_task(entry)
916 if task:
showardd1195652009-12-08 22:21:02 +0000917 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000918
919
jamesren883492a2010-02-12 00:45:18 +0000920 def _run_queue_entry(self, queue_entry):
Simran Basidef92872012-09-20 13:34:34 -0700921 self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
922 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000923 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000924
925
jadmanski0afbb632008-06-06 21:10:57 +0000926 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700927 """
928 Looks through the afe_host_queue_entries for an aborted entry.
929
930 The aborted bit is set on an HQE in many ways, the most common
931 being when a user requests an abort through the frontend, which
932 results in an rpc from the afe to abort_host_queue_entries.
933 """
jamesrene7c65cb2010-06-08 20:38:10 +0000934 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000935 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700936 where='aborted=1 and complete=0'):
showardf4a2e502009-07-28 20:06:39 +0000937 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800938
939 # The task would have started off with both is_complete and
940 # is_active = False. Aborted tasks are neither active nor complete.
941 # For all currently active tasks this will happen through the agent,
942 # but we need to manually update the special tasks that haven't
943 # started yet, because they don't have agents.
944 models.SpecialTask.objects.filter(is_active=False,
945 queue_entry_id=entry.id).update(is_complete=True)
946
showardd3dc1992009-04-22 21:01:40 +0000947 for agent in self.get_agents_for_entry(entry):
948 agent.abort()
949 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000950 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700951 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000952 for job in jobs_to_stop:
953 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000954
955
beeps8bb1f7d2013-08-05 01:30:09 -0700956 def _find_aborted_special_tasks(self):
957 """
958 Find SpecialTasks that have been marked for abortion.
959
960 Poll the database looking for SpecialTasks that are active
961 and have been marked for abortion, then abort them.
962 """
963
964 # The completed and active bits are very important when it comes
965 # to scheduler correctness. The active bit is set through the prolog
966 # of a special task, and reset through the cleanup method of the
967 # SpecialAgentTask. The cleanup is called both through the abort and
968 # epilog. The complete bit is set in several places, and in general
969 # a hanging job will have is_active=1 is_complete=0, while a special
970 # task which completed will have is_active=0 is_complete=1. To check
971 # aborts we directly check active because the complete bit is set in
972 # several places, including the epilog of agent tasks.
973 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
974 is_aborted=True)
975 for task in aborted_tasks:
976 # There are 2 ways to get the agent associated with a task,
977 # through the host and through the hqe. A special task
978 # always needs a host, but doesn't always need a hqe.
979 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700980 if isinstance(agent.task, agent_task.SpecialAgentTask):
beeps8bb1f7d2013-08-05 01:30:09 -0700981
982 # The epilog preforms critical actions such as
983 # queueing the next SpecialTask, requeuing the
984 # hqe etc, however it doesn't actually kill the
985 # monitor process and set the 'done' bit. Epilogs
986 # assume that the job failed, and that the monitor
987 # process has already written an exit code. The
988 # done bit is a necessary condition for
989 # _handle_agents to schedule any more special
990 # tasks against the host, and it must be set
991 # in addition to is_active, is_complete and success.
992 agent.task.epilog()
993 agent.task.abort()
994
995
showard324bf812009-01-20 23:23:38 +0000996 def _can_start_agent(self, agent, num_started_this_cycle,
997 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000998 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000999 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +00001000 return True
1001 # don't allow any nonzero-process agents to run after we've reached a
1002 # limit (this avoids starvation of many-process agents)
1003 if have_reached_limit:
1004 return False
1005 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001006 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +00001007 agent.task.owner_username,
1008 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +00001009 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001010 return False
1011 # if a single agent exceeds the per-cycle throttling, still allow it to
1012 # run when it's the first agent in the cycle
1013 if num_started_this_cycle == 0:
1014 return True
1015 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +00001016 if (num_started_this_cycle + agent.task.num_processes >
1017 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001018 return False
1019 return True
1020
1021
jadmanski0afbb632008-06-06 21:10:57 +00001022 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -07001023 """
1024 Handles agents of the dispatcher.
1025
1026 Appropriate Agents are added to the dispatcher through
1027 _schedule_running_host_queue_entries. These agents each
1028 have a task. This method runs the agents task through
1029 agent.tick() leading to:
1030 agent.start
1031 prolog -> AgentTasks prolog
1032 For each queue entry:
1033 sets host status/status to Running
1034 set started_on in afe_host_queue_entries
1035 run -> AgentTasks run
1036 Creates PidfileRunMonitor
1037 Queues the autoserv command line for this AgentTask
1038 via the drone manager. These commands are executed
1039 through the drone managers execute actions.
1040 poll -> AgentTasks/BaseAgentTask poll
1041 checks the monitors exit_code.
1042 Executes epilog if task is finished.
1043 Executes AgentTasks _finish_task
1044 finish_task is usually responsible for setting the status
1045 of the HQE/host, and updating it's active and complete fileds.
1046
1047 agent.is_done
1048 Removed the agent from the dispatchers _agents queue.
1049 Is_done checks the finished bit on the agent, that is
1050 set based on the Agents task. During the agents poll
1051 we check to see if the monitor process has exited in
1052 it's finish method, and set the success member of the
1053 task based on this exit code.
1054 """
jadmanski0afbb632008-06-06 21:10:57 +00001055 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001056 have_reached_limit = False
1057 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -07001058 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +00001059 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001060 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1061 'queue_entry ids:%s' % (agent.host_ids,
1062 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001063 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001064 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001065 have_reached_limit):
1066 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001067 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001068 continue
showardd1195652009-12-08 22:21:02 +00001069 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001070 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001071 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001072 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001073 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -07001074 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001075 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -07001076 logging.info('%d running processes. %d added this cycle.',
1077 _drone_manager.total_running_processes(),
1078 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +00001079
1080
showard29f7cd22009-04-29 21:16:24 +00001081 def _process_recurring_runs(self):
1082 recurring_runs = models.RecurringRun.objects.filter(
1083 start_date__lte=datetime.datetime.now())
1084 for rrun in recurring_runs:
1085 # Create job from template
1086 job = rrun.job
1087 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001088 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001089
1090 host_objects = info['hosts']
1091 one_time_hosts = info['one_time_hosts']
1092 metahost_objects = info['meta_hosts']
1093 dependencies = info['dependencies']
1094 atomic_group = info['atomic_group']
1095
1096 for host in one_time_hosts or []:
1097 this_host = models.Host.create_one_time_host(host.hostname)
1098 host_objects.append(this_host)
1099
1100 try:
1101 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001102 options=options,
showard29f7cd22009-04-29 21:16:24 +00001103 host_objects=host_objects,
1104 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001105 atomic_group=atomic_group)
1106
1107 except Exception, ex:
1108 logging.exception(ex)
1109 #TODO send email
1110
1111 if rrun.loop_count == 1:
1112 rrun.delete()
1113 else:
1114 if rrun.loop_count != 0: # if not infinite loop
1115 # calculate new start_date
1116 difference = datetime.timedelta(seconds=rrun.loop_period)
1117 rrun.start_date = rrun.start_date + difference
1118 rrun.loop_count -= 1
1119 rrun.save()
1120
1121
Simran Basia858a232012-08-21 11:04:37 -07001122SiteDispatcher = utils.import_site_class(
1123 __file__, 'autotest_lib.scheduler.site_monitor_db',
1124 'SiteDispatcher', BaseDispatcher)
1125
1126class Dispatcher(SiteDispatcher):
1127 pass
1128
1129
mbligh36768f02008-02-22 18:28:33 +00001130class Agent(object):
showard77182562009-06-10 00:16:05 +00001131 """
Alex Miller47715eb2013-07-24 03:34:01 -07001132 An agent for use by the Dispatcher class to perform a task. An agent wraps
1133 around an AgentTask mainly to associate the AgentTask with the queue_entry
1134 and host ids.
showard77182562009-06-10 00:16:05 +00001135
1136 The following methods are required on all task objects:
1137 poll() - Called periodically to let the task check its status and
1138 update its internal state. If the task succeeded.
1139 is_done() - Returns True if the task is finished.
1140 abort() - Called when an abort has been requested. The task must
1141 set its aborted attribute to True if it actually aborted.
1142
1143 The following attributes are required on all task objects:
1144 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001145 success - bool, True if this task succeeded.
1146 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1147 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001148 """
1149
1150
showard418785b2009-11-23 20:19:59 +00001151 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001152 """
Alex Miller47715eb2013-07-24 03:34:01 -07001153 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001154 """
showard8cc058f2009-09-08 16:26:33 +00001155 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001156
showard77182562009-06-10 00:16:05 +00001157 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001158 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001159
showard8cc058f2009-09-08 16:26:33 +00001160 self.queue_entry_ids = task.queue_entry_ids
1161 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001162
showard8cc058f2009-09-08 16:26:33 +00001163 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001164 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001165
1166
jadmanski0afbb632008-06-06 21:10:57 +00001167 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001168 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001169 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001170 self.task.poll()
1171 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001172 self.finished = True
showardec113162008-05-08 00:52:49 +00001173
1174
jadmanski0afbb632008-06-06 21:10:57 +00001175 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001176 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001177
1178
showardd3dc1992009-04-22 21:01:40 +00001179 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001180 if self.task:
1181 self.task.abort()
1182 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001183 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001184 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001185
showardd3dc1992009-04-22 21:01:40 +00001186
beeps5e2bb4a2013-10-28 11:26:45 -07001187class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001188 """
1189 Common functionality for QueueTask and HostlessQueueTask
1190 """
1191 def __init__(self, queue_entries):
1192 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001193 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001194 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001195
1196
showard73ec0442009-02-07 02:05:20 +00001197 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001198 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001199
1200
jamesrenc44ae992010-02-19 00:12:54 +00001201 def _write_control_file(self, execution_path):
1202 control_path = _drone_manager.attach_file_to_execution(
1203 execution_path, self.job.control_file)
1204 return control_path
1205
1206
Aviv Keshet308e7362013-05-21 14:43:16 -07001207 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001208 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001209 execution_path = self.queue_entries[0].execution_path()
1210 control_path = self._write_control_file(execution_path)
1211 hostnames = ','.join(entry.host.hostname
1212 for entry in self.queue_entries
1213 if not entry.is_hostless())
1214
1215 execution_tag = self.queue_entries[0].execution_tag()
1216 params = _autoserv_command_line(
1217 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001218 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001219 _drone_manager.absolute_path(control_path)],
1220 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001221 if self.job.is_image_update_job():
1222 params += ['--image', self.job.update_image_path]
1223
jamesrenc44ae992010-02-19 00:12:54 +00001224 return params
showardd1195652009-12-08 22:21:02 +00001225
1226
1227 @property
1228 def num_processes(self):
1229 return len(self.queue_entries)
1230
1231
1232 @property
1233 def owner_username(self):
1234 return self.job.owner
1235
1236
1237 def _working_directory(self):
1238 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001239
1240
jadmanski0afbb632008-06-06 21:10:57 +00001241 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001242 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001243 keyval_dict = self.job.keyval_dict()
1244 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001245 group_name = self.queue_entries[0].get_group_name()
1246 if group_name:
1247 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001248 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001249 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001250 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001251 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001252
1253
showard35162b02009-03-03 02:17:30 +00001254 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001255 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001256 _drone_manager.write_lines_to_file(error_file_path,
1257 [_LOST_PROCESS_ERROR])
1258
1259
showardd3dc1992009-04-22 21:01:40 +00001260 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001261 if not self.monitor:
1262 return
1263
showardd9205182009-04-27 20:09:55 +00001264 self._write_job_finished()
1265
showard35162b02009-03-03 02:17:30 +00001266 if self.monitor.lost_process:
1267 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001268
jadmanskif7fa2cc2008-10-01 14:13:23 +00001269
showardcbd74612008-11-19 21:42:02 +00001270 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001271 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001272 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001273 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001274 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001275
1276
jadmanskif7fa2cc2008-10-01 14:13:23 +00001277 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001278 if not self.monitor or not self.monitor.has_process():
1279 return
1280
jadmanskif7fa2cc2008-10-01 14:13:23 +00001281 # build up sets of all the aborted_by and aborted_on values
1282 aborted_by, aborted_on = set(), set()
1283 for queue_entry in self.queue_entries:
1284 if queue_entry.aborted_by:
1285 aborted_by.add(queue_entry.aborted_by)
1286 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1287 aborted_on.add(t)
1288
1289 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001290 # TODO(showard): this conditional is now obsolete, we just need to leave
1291 # it in temporarily for backwards compatibility over upgrades. delete
1292 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001293 assert len(aborted_by) <= 1
1294 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001295 aborted_by_value = aborted_by.pop()
1296 aborted_on_value = max(aborted_on)
1297 else:
1298 aborted_by_value = 'autotest_system'
1299 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001300
showarda0382352009-02-11 23:36:43 +00001301 self._write_keyval_after_job("aborted_by", aborted_by_value)
1302 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001303
showardcbd74612008-11-19 21:42:02 +00001304 aborted_on_string = str(datetime.datetime.fromtimestamp(
1305 aborted_on_value))
1306 self._write_status_comment('Job aborted by %s on %s' %
1307 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001308
1309
jadmanski0afbb632008-06-06 21:10:57 +00001310 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001311 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001312 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001313 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001314
1315
jadmanski0afbb632008-06-06 21:10:57 +00001316 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001317 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001318 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001319
1320
1321class QueueTask(AbstractQueueTask):
1322 def __init__(self, queue_entries):
1323 super(QueueTask, self).__init__(queue_entries)
1324 self._set_ids(queue_entries=queue_entries)
1325
1326
1327 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001328 self._check_queue_entry_statuses(
1329 self.queue_entries,
1330 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1331 models.HostQueueEntry.Status.RUNNING),
1332 allowed_host_statuses=(models.Host.Status.PENDING,
1333 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001334
1335 super(QueueTask, self).prolog()
1336
1337 for queue_entry in self.queue_entries:
1338 self._write_host_keyvals(queue_entry.host)
1339 queue_entry.host.set_status(models.Host.Status.RUNNING)
1340 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001341
1342
1343 def _finish_task(self):
1344 super(QueueTask, self)._finish_task()
1345
1346 for queue_entry in self.queue_entries:
1347 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001348 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001349
1350
Alex Miller9f01d5d2013-08-08 02:26:01 -07001351 def _command_line(self):
1352 invocation = super(QueueTask, self)._command_line()
1353 return invocation + ['--verify_job_repo_url']
1354
1355
Dan Shi1a189052013-10-28 14:41:35 -07001356class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001357 def __init__(self, queue_entry):
1358 super(HostlessQueueTask, self).__init__([queue_entry])
1359 self.queue_entry_ids = [queue_entry.id]
1360
1361
1362 def prolog(self):
1363 self.queue_entries[0].update_field('execution_subdir', 'hostless')
1364 super(HostlessQueueTask, self).prolog()
1365
1366
mbligh4608b002010-01-05 18:22:35 +00001367 def _finish_task(self):
1368 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001369
1370 # When a job is added to database, its initial status is always
1371 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1372 # status, check if any of them can be started. If scheduler hits some
1373 # limit, e.g., max_hostless_jobs_per_drone, max_jobs_started_per_cycle,
1374 # scheduler will leave these jobs in Starting status. Otherwise, the
1375 # jobs' status will be changed to Running, and an autoserv process will
1376 # be started in drone for each of these jobs.
1377 # If the entry is still in status Starting, the process has not started
1378 # yet. Therefore, there is no need to parse and collect log. Without
1379 # this check, exception will be raised by scheduler as execution_subdir
1380 # for this queue entry does not have a value yet.
1381 hqe = self.queue_entries[0]
1382 if hqe.status != models.HostQueueEntry.Status.STARTING:
1383 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001384
1385
mbligh36768f02008-02-22 18:28:33 +00001386if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001387 main()