blob: 4c320e7056d2b131f712021251db24b7d93f88d8 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
Aviv Keshet225bdfe2013-03-05 10:10:08 -08008import datetime, optparse, os, signal
beeps5e2bb4a2013-10-28 11:26:45 -07009import sys, time
Aviv Keshet225bdfe2013-03-05 10:10:08 -080010import logging, gc
showard402934a2009-12-21 22:20:47 +000011
Alex Miller05d7b4c2013-03-04 07:49:38 -080012import common
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000014
15import django.db
16
showard136e6dc2009-06-10 19:38:49 +000017from autotest_lib.client.common_lib import global_config, logging_manager
beeps5e2bb4a2013-10-28 11:26:45 -070018from autotest_lib.client.common_lib import utils
showardb1e51872008-10-07 11:08:18 +000019from autotest_lib.database import database_connection
Alex Miller05d7b4c2013-03-04 07:49:38 -080020from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
beeps5e2bb4a2013-10-28 11:26:45 -070021from autotest_lib.scheduler import agent_task, drone_manager, drones
22from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
23from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
24from autotest_lib.scheduler import postjob_task, scheduler_logging_config
beepscc9fc702013-12-02 12:45:38 -080025from autotest_lib.scheduler import rdb_lib
26from autotest_lib.scheduler import rdb_utils
jamesrenc44ae992010-02-19 00:12:54 +000027from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080028from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070029from autotest_lib.server import autoserv_utils
Fang Deng1d6c2a02013-04-17 15:25:45 -070030from autotest_lib.site_utils.graphite import stats
Alex Miller05d7b4c2013-03-04 07:49:38 -080031
showard549afad2009-08-20 23:33:36 +000032BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
33PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000034
mbligh36768f02008-02-22 18:28:33 +000035RESULTS_DIR = '.'
showard170873e2009-01-07 00:22:26 +000036DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000037AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
38
39if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000040 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000041AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
42AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
43
44if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000045 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000046
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070056
57# These 2 globals are replaced for testing
58_autoserv_directory = autoserv_utils.autoserv_directory
59_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000060_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000061_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000062
Eric Lie0493a42010-11-15 13:05:43 -080063def _parser_path_default(install_dir):
64 return os.path.join(install_dir, 'tko', 'parse')
65_parser_path_func = utils.import_site_function(
66 __file__, 'autotest_lib.scheduler.site_monitor_db',
67 'parser_path', _parser_path_default)
68_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
69
mbligh36768f02008-02-22 18:28:33 +000070
mbligh83c1e9e2009-05-01 23:10:41 +000071def _site_init_monitor_db_dummy():
72 return {}
73
74
jamesren76fcf192010-04-21 20:39:50 +000075def _verify_default_drone_set_exists():
76 if (models.DroneSet.drone_sets_enabled() and
77 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080078 raise host_scheduler.SchedulerError(
79 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000080
81
82def _sanity_check():
83 """Make sure the configs are consistent before starting the scheduler"""
84 _verify_default_drone_set_exists()
85
86
mbligh36768f02008-02-22 18:28:33 +000087def main():
showard27f33872009-04-07 18:20:53 +000088 try:
showard549afad2009-08-20 23:33:36 +000089 try:
90 main_without_exception_handling()
91 except SystemExit:
92 raise
93 except:
94 logging.exception('Exception escaping in monitor_db')
95 raise
96 finally:
97 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000098
99
100def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000101 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000102
showard136e6dc2009-06-10 19:38:49 +0000103 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000104 parser = optparse.OptionParser(usage)
105 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
106 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000107 parser.add_option('--test', help='Indicate that scheduler is under ' +
108 'test and should use dummy autoserv and no parsing',
109 action='store_true')
110 (options, args) = parser.parse_args()
111 if len(args) != 1:
112 parser.print_usage()
113 return
mbligh36768f02008-02-22 18:28:33 +0000114
showard5613c662009-06-08 23:30:33 +0000115 scheduler_enabled = global_config.global_config.get_config_value(
116 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
117
118 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800119 logging.error("Scheduler not enabled, set enable_scheduler to true in "
120 "the global_config's SCHEDULER section to enable it. "
121 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000122 sys.exit(1)
123
jadmanski0afbb632008-06-06 21:10:57 +0000124 global RESULTS_DIR
125 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000126
mbligh83c1e9e2009-05-01 23:10:41 +0000127 site_init = utils.import_site_function(__file__,
128 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
129 _site_init_monitor_db_dummy)
130 site_init()
131
showardcca334f2009-03-12 20:38:34 +0000132 # Change the cwd while running to avoid issues incase we were launched from
133 # somewhere odd (such as a random NFS home directory of the person running
134 # sudo to launch us as the appropriate user).
135 os.chdir(RESULTS_DIR)
136
jamesrenc7d387e2010-08-10 21:48:30 +0000137 # This is helpful for debugging why stuff a scheduler launches is
138 # misbehaving.
139 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000140
jadmanski0afbb632008-06-06 21:10:57 +0000141 if options.test:
142 global _autoserv_path
143 _autoserv_path = 'autoserv_dummy'
144 global _testing_mode
145 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000146
jamesrenc44ae992010-02-19 00:12:54 +0000147 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000148 server.start()
149
jadmanski0afbb632008-06-06 21:10:57 +0000150 try:
jamesrenc44ae992010-02-19 00:12:54 +0000151 initialize()
showardc5afc462009-01-13 00:09:39 +0000152 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000153 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000154
Eric Lia82dc352011-02-23 13:15:52 -0800155 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000156 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000157 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000158 except:
showard170873e2009-01-07 00:22:26 +0000159 email_manager.manager.log_stacktrace(
160 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000161
showard170873e2009-01-07 00:22:26 +0000162 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000163 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000164 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000165 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000166
167
showard136e6dc2009-06-10 19:38:49 +0000168def setup_logging():
169 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
170 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
171 logging_manager.configure_logging(
172 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
173 logfile_name=log_name)
174
175
mbligh36768f02008-02-22 18:28:33 +0000176def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000177 global _shutdown
178 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000179 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000180
181
jamesrenc44ae992010-02-19 00:12:54 +0000182def initialize():
showardb18134f2009-03-20 20:52:18 +0000183 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
184 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000185
showard8de37132009-08-31 18:33:08 +0000186 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000187 logging.critical("monitor_db already running, aborting!")
188 sys.exit(1)
189 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000190
showardb1e51872008-10-07 11:08:18 +0000191 if _testing_mode:
192 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000193 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000194
jadmanski0afbb632008-06-06 21:10:57 +0000195 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
196 global _db
showard170873e2009-01-07 00:22:26 +0000197 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000198 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000199
showardfa8629c2008-11-04 16:51:23 +0000200 # ensure Django connection is in autocommit
201 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000202 # bypass the readonly connection
203 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000204
showardb18134f2009-03-20 20:52:18 +0000205 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000206 signal.signal(signal.SIGINT, handle_sigint)
207
jamesrenc44ae992010-02-19 00:12:54 +0000208 initialize_globals()
209 scheduler_models.initialize()
210
showardd1ee1dd2009-01-07 21:33:08 +0000211 drones = global_config.global_config.get_config_value(
212 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
213 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000214 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000215 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000216 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
217
showardb18134f2009-03-20 20:52:18 +0000218 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000219
220
jamesrenc44ae992010-02-19 00:12:54 +0000221def initialize_globals():
222 global _drone_manager
223 _drone_manager = drone_manager.instance()
224
225
showarded2afea2009-07-07 20:54:07 +0000226def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
227 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000228 """
229 @returns The autoserv command line as a list of executable + parameters.
230
231 @param machines - string - A machine or comma separated list of machines
232 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000233 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700234 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
235 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000236 @param queue_entry - A HostQueueEntry object - If supplied and no Job
237 object was supplied, this will be used to lookup the Job object.
238 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700239 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
240 machines, results_directory=drone_manager.WORKING_DIRECTORY,
241 extra_args=extra_args, job=job, queue_entry=queue_entry,
242 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000243
244
Simran Basia858a232012-08-21 11:04:37 -0700245class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800246
247
jadmanski0afbb632008-06-06 21:10:57 +0000248 def __init__(self):
249 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000250 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800251 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000252 user_cleanup_time = scheduler_config.config.clean_interval
253 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
254 _db, user_cleanup_time)
255 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000256 self._host_agents = {}
257 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000258 self._tick_count = 0
259 self._last_garbage_stats_time = time.time()
260 self._seconds_between_garbage_stats = 60 * (
261 global_config.global_config.get_config_value(
262 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700263 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700264 self._tick_debug = global_config.global_config.get_config_value(
265 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
266 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700267 self._extra_debugging = global_config.global_config.get_config_value(
268 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
269 default=False)
mbligh36768f02008-02-22 18:28:33 +0000270
mbligh36768f02008-02-22 18:28:33 +0000271
showard915958d2009-04-22 21:00:58 +0000272 def initialize(self, recover_hosts=True):
273 self._periodic_cleanup.initialize()
274 self._24hr_upkeep.initialize()
275
jadmanski0afbb632008-06-06 21:10:57 +0000276 # always recover processes
277 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000278
jadmanski0afbb632008-06-06 21:10:57 +0000279 if recover_hosts:
280 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000281
jamesrenc44ae992010-02-19 00:12:54 +0000282 self._host_scheduler.recovery_on_startup()
283
mbligh36768f02008-02-22 18:28:33 +0000284
Simran Basi0ec94dd2012-08-28 09:50:10 -0700285 def _log_tick_msg(self, msg):
286 if self._tick_debug:
287 logging.debug(msg)
288
289
Simran Basidef92872012-09-20 13:34:34 -0700290 def _log_extra_msg(self, msg):
291 if self._extra_debugging:
292 logging.debug(msg)
293
294
jadmanski0afbb632008-06-06 21:10:57 +0000295 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700296 """
297 This is an altered version of tick() where we keep track of when each
298 major step begins so we can try to figure out where we are using most
299 of the tick time.
300 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700301 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700302 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000303 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700304 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000305 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700306 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000307 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700308 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000309 self._find_aborting()
beeps8bb1f7d2013-08-05 01:30:09 -0700310 self._log_tick_msg('Calling _find_aborted_special_tasks().')
311 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700312 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000313 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700314 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000315 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700316 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000317 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700318 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000319 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700320 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000321 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700322 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000323 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700324 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000325 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700326 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000327 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700328 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700329 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700330 with timer.get_client('email_manager_send_queued_emails'):
331 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700332 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700333 with timer.get_client('django_db_reset_queries'):
334 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000335 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000336
showard97aed502008-11-04 02:01:24 +0000337
mblighf3294cc2009-04-08 21:17:38 +0000338 def _run_cleanup(self):
339 self._periodic_cleanup.run_cleanup_maybe()
340 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000341
mbligh36768f02008-02-22 18:28:33 +0000342
showardf13a9e22009-12-18 22:54:09 +0000343 def _garbage_collection(self):
344 threshold_time = time.time() - self._seconds_between_garbage_stats
345 if threshold_time < self._last_garbage_stats_time:
346 # Don't generate these reports very often.
347 return
348
349 self._last_garbage_stats_time = time.time()
350 # Force a full level 0 collection (because we can, it doesn't hurt
351 # at this interval).
352 gc.collect()
353 logging.info('Logging garbage collector stats on tick %d.',
354 self._tick_count)
355 gc_stats._log_garbage_collector_stats()
356
357
showard170873e2009-01-07 00:22:26 +0000358 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
359 for object_id in object_ids:
360 agent_dict.setdefault(object_id, set()).add(agent)
361
362
363 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
364 for object_id in object_ids:
365 assert object_id in agent_dict
366 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700367 # If an ID has no more active agent associated, there is no need to
368 # keep it in the dictionary. Otherwise, scheduler will keep an
369 # unnecessarily big dictionary until being restarted.
370 if not agent_dict[object_id]:
371 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000372
373
showardd1195652009-12-08 22:21:02 +0000374 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700375 """
376 Creates and adds an agent to the dispatchers list.
377
378 In creating the agent we also pass on all the queue_entry_ids and
379 host_ids from the special agent task. For every agent we create, we
380 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
381 against the host_ids given to it. So theoritically, a host can have any
382 number of agents associated with it, and each of them can have any
383 special agent task, though in practice we never see > 1 agent/task per
384 host at any time.
385
386 @param agent_task: A SpecialTask for the agent to manage.
387 """
showardd1195652009-12-08 22:21:02 +0000388 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000389 self._agents.append(agent)
390 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000391 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
392 self._register_agent_for_ids(self._queue_entry_agents,
393 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000394
showard170873e2009-01-07 00:22:26 +0000395
396 def get_agents_for_entry(self, queue_entry):
397 """
398 Find agents corresponding to the specified queue_entry.
399 """
showardd3dc1992009-04-22 21:01:40 +0000400 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000401
402
403 def host_has_agent(self, host):
404 """
405 Determine if there is currently an Agent present using this host.
406 """
407 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000408
409
jadmanski0afbb632008-06-06 21:10:57 +0000410 def remove_agent(self, agent):
411 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000412 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
413 agent)
414 self._unregister_agent_for_ids(self._queue_entry_agents,
415 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000416
417
showard8cc058f2009-09-08 16:26:33 +0000418 def _host_has_scheduled_special_task(self, host):
419 return bool(models.SpecialTask.objects.filter(host__id=host.id,
420 is_active=False,
421 is_complete=False))
422
423
jadmanski0afbb632008-06-06 21:10:57 +0000424 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000425 agent_tasks = self._create_recovery_agent_tasks()
426 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000427 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000428 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000429 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000430 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000431 self._reverify_remaining_hosts()
432 # reinitialize drones after killing orphaned processes, since they can
433 # leave around files when they die
434 _drone_manager.execute_actions()
435 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000436
showard170873e2009-01-07 00:22:26 +0000437
showardd1195652009-12-08 22:21:02 +0000438 def _create_recovery_agent_tasks(self):
439 return (self._get_queue_entry_agent_tasks()
440 + self._get_special_task_agent_tasks(is_active=True))
441
442
443 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700444 """
445 Get agent tasks for all hqe in the specified states.
446
447 Loosely this translates to taking a hqe in one of the specified states,
448 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
449 through _get_agent_task_for_queue_entry. Each queue entry can only have
450 one agent task at a time, but there might be multiple queue entries in
451 the group.
452
453 @return: A list of AgentTasks.
454 """
showardd1195652009-12-08 22:21:02 +0000455 # host queue entry statuses handled directly by AgentTasks (Verifying is
456 # handled through SpecialTasks, so is not listed here)
457 statuses = (models.HostQueueEntry.Status.STARTING,
458 models.HostQueueEntry.Status.RUNNING,
459 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000460 models.HostQueueEntry.Status.PARSING,
461 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000462 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000463 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000464 where='status IN (%s)' % status_list)
Alex Miller47cd2472013-11-25 15:20:04 -0800465 stats.Gauge('scheduler.jobs_per_tick').send(
466 'running', len(queue_entries))
showardd1195652009-12-08 22:21:02 +0000467
468 agent_tasks = []
469 used_queue_entries = set()
470 for entry in queue_entries:
471 if self.get_agents_for_entry(entry):
472 # already being handled
473 continue
474 if entry in used_queue_entries:
475 # already picked up by a synchronous job
476 continue
477 agent_task = self._get_agent_task_for_queue_entry(entry)
478 agent_tasks.append(agent_task)
479 used_queue_entries.update(agent_task.queue_entries)
480 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000481
482
showardd1195652009-12-08 22:21:02 +0000483 def _get_special_task_agent_tasks(self, is_active=False):
484 special_tasks = models.SpecialTask.objects.filter(
485 is_active=is_active, is_complete=False)
486 return [self._get_agent_task_for_special_task(task)
487 for task in special_tasks]
488
489
490 def _get_agent_task_for_queue_entry(self, queue_entry):
491 """
beeps8bb1f7d2013-08-05 01:30:09 -0700492 Construct an AgentTask instance for the given active HostQueueEntry.
493
showardd1195652009-12-08 22:21:02 +0000494 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700495 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000496 """
497 task_entries = queue_entry.job.get_group_entries(queue_entry)
498 self._check_for_duplicate_host_entries(task_entries)
499
500 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
501 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000502 if queue_entry.is_hostless():
503 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000504 return QueueTask(queue_entries=task_entries)
505 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700506 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000507 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700508 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000509 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700510 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000511
Dale Curtisaa513362011-03-01 17:27:44 -0800512 raise host_scheduler.SchedulerError(
513 '_get_agent_task_for_queue_entry got entry with '
514 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000515
516
517 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000518 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
519 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000520 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000521 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000522 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000523 if using_host:
showardd1195652009-12-08 22:21:02 +0000524 self._assert_host_has_no_agent(task_entry)
525
526
527 def _assert_host_has_no_agent(self, entry):
528 """
529 @param entry: a HostQueueEntry or a SpecialTask
530 """
531 if self.host_has_agent(entry.host):
532 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800533 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000534 'While scheduling %s, host %s already has a host agent %s'
535 % (entry, entry.host, agent.task))
536
537
538 def _get_agent_task_for_special_task(self, special_task):
539 """
540 Construct an AgentTask class to run the given SpecialTask and add it
541 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700542
543 A special task is create through schedule_special_tasks, but only if
544 the host doesn't already have an agent. This happens through
545 add_agent_task. All special agent tasks are given a host on creation,
546 and a Null hqe. To create a SpecialAgentTask object, you need a
547 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
548 object contains a hqe it's passed on to the special agent task, which
549 creates a HostQueueEntry and saves it as it's queue_entry.
550
showardd1195652009-12-08 22:21:02 +0000551 @param special_task: a models.SpecialTask instance
552 @returns an AgentTask to run this SpecialTask
553 """
554 self._assert_host_has_no_agent(special_task)
555
beeps5e2bb4a2013-10-28 11:26:45 -0700556 special_agent_task_classes = (prejob_task.CleanupTask,
557 prejob_task.VerifyTask,
558 prejob_task.RepairTask,
559 prejob_task.ResetTask,
560 prejob_task.ProvisionTask)
561
showardd1195652009-12-08 22:21:02 +0000562 for agent_task_class in special_agent_task_classes:
563 if agent_task_class.TASK_TYPE == special_task.task:
564 return agent_task_class(task=special_task)
565
Dale Curtisaa513362011-03-01 17:27:44 -0800566 raise host_scheduler.SchedulerError(
567 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000568
569
570 def _register_pidfiles(self, agent_tasks):
571 for agent_task in agent_tasks:
572 agent_task.register_necessary_pidfiles()
573
574
575 def _recover_tasks(self, agent_tasks):
576 orphans = _drone_manager.get_orphaned_autoserv_processes()
577
578 for agent_task in agent_tasks:
579 agent_task.recover()
580 if agent_task.monitor and agent_task.monitor.has_process():
581 orphans.discard(agent_task.monitor.get_process())
582 self.add_agent_task(agent_task)
583
584 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000585
586
showard8cc058f2009-09-08 16:26:33 +0000587 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000588 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
589 % status):
showard0db3d432009-10-12 20:29:15 +0000590 if entry.status == status and not self.get_agents_for_entry(entry):
591 # The status can change during iteration, e.g., if job.run()
592 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000593 yield entry
594
595
showard6878e8b2009-07-20 22:37:45 +0000596 def _check_for_remaining_orphan_processes(self, orphans):
597 if not orphans:
598 return
599 subject = 'Unrecovered orphan autoserv processes remain'
600 message = '\n'.join(str(process) for process in orphans)
601 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000602
603 die_on_orphans = global_config.global_config.get_config_value(
604 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
605
606 if die_on_orphans:
607 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000608
showard170873e2009-01-07 00:22:26 +0000609
showard8cc058f2009-09-08 16:26:33 +0000610 def _recover_pending_entries(self):
611 for entry in self._get_unassigned_entries(
612 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000613 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000614 entry.on_pending()
615
616
showardb8900452009-10-12 20:31:01 +0000617 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000618 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000619 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
620 unrecovered_hqes = []
621 for queue_entry in queue_entries:
622 special_tasks = models.SpecialTask.objects.filter(
623 task__in=(models.SpecialTask.Task.CLEANUP,
624 models.SpecialTask.Task.VERIFY),
625 queue_entry__id=queue_entry.id,
626 is_complete=False)
627 if special_tasks.count() == 0:
628 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000629
showardb8900452009-10-12 20:31:01 +0000630 if unrecovered_hqes:
631 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800632 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000633 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000634 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000635
636
showard65db3932009-10-28 19:54:35 +0000637 def _get_prioritized_special_tasks(self):
638 """
639 Returns all queued SpecialTasks prioritized for repair first, then
640 cleanup, then verify.
beeps8bb1f7d2013-08-05 01:30:09 -0700641
642 @return: list of afe.models.SpecialTasks sorted according to priority.
showard65db3932009-10-28 19:54:35 +0000643 """
644 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
645 is_complete=False,
646 host__locked=False)
647 # exclude hosts with active queue entries unless the SpecialTask is for
648 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000649 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000650 queued_tasks, 'afe_host_queue_entries', 'host_id',
651 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000652 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000653 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000654 where=['(afe_host_queue_entries.id IS NULL OR '
655 'afe_host_queue_entries.id = '
656 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000657
showard65db3932009-10-28 19:54:35 +0000658 # reorder tasks by priority
659 task_priority_order = [models.SpecialTask.Task.REPAIR,
660 models.SpecialTask.Task.CLEANUP,
Dan Shi07e09af2013-04-12 09:31:29 -0700661 models.SpecialTask.Task.VERIFY,
Alex Millerdfff2fd2013-05-28 13:05:06 -0700662 models.SpecialTask.Task.RESET,
663 models.SpecialTask.Task.PROVISION]
showard65db3932009-10-28 19:54:35 +0000664 def task_priority_key(task):
665 return task_priority_order.index(task.task)
666 return sorted(queued_tasks, key=task_priority_key)
667
668
showard65db3932009-10-28 19:54:35 +0000669 def _schedule_special_tasks(self):
670 """
671 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700672
673 Special tasks include PreJobTasks like verify, reset and cleanup.
674 They are created through _schedule_new_jobs and associated with a hqe
675 This method translates SpecialTasks to the appropriate AgentTask and
676 adds them to the dispatchers agents list, so _handle_agents can execute
677 them.
showard65db3932009-10-28 19:54:35 +0000678 """
679 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000680 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000681 continue
showardd1195652009-12-08 22:21:02 +0000682 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000683
684
showard170873e2009-01-07 00:22:26 +0000685 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000686 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000687 # should never happen
showarded2afea2009-07-07 20:54:07 +0000688 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000689 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000690 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700691 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000692 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000693
694
jadmanski0afbb632008-06-06 21:10:57 +0000695 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000696 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700697 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000698 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000699 if self.host_has_agent(host):
700 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000701 continue
showard8cc058f2009-09-08 16:26:33 +0000702 if self._host_has_scheduled_special_task(host):
703 # host will have a special task scheduled on the next cycle
704 continue
showard170873e2009-01-07 00:22:26 +0000705 if print_message:
showardb18134f2009-03-20 20:52:18 +0000706 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000707 models.SpecialTask.objects.create(
708 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000709 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000710
711
jadmanski0afbb632008-06-06 21:10:57 +0000712 def _recover_hosts(self):
713 # recover "Repair Failed" hosts
714 message = 'Reverifying dead host %s'
715 self._reverify_hosts_where("status = 'Repair Failed'",
716 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000717
718
showard04c82c52008-05-29 19:38:12 +0000719
showardb95b1bd2008-08-15 18:11:04 +0000720 def _get_pending_queue_entries(self):
beeps7d8a1b12013-10-29 17:58:34 -0700721 """
722 Fetch a list of new host queue entries.
723
724 The ordering of this list is important, as every new agent
725 we schedule can potentially contribute to the process count
726 on the drone, which has a static limit. The sort order
727 prioritizes jobs as follows:
728 1. High priority jobs: Based on the afe_job's priority
729 2. With hosts and metahosts: This will only happen if we don't
730 activate the hqe after assigning a host to it in
731 schedule_new_jobs.
732 3. With hosts but without metahosts: When tests are scheduled
733 through the frontend the owner of the job would have chosen
734 a host for it.
735 4. Without hosts but with metahosts: This is the common case of
736 a new test that needs a DUT. We assign a host and set it to
737 active so it shouldn't show up in case 2 on the next tick.
738 5. Without hosts and without metahosts: Hostless suite jobs, that
739 will result in new jobs that fall under category 4.
740
741 A note about the ordering of cases 3 and 4:
742 Prioritizing one case above the other leads to earlier acquisition
743 of the following resources: 1. process slots on the drone 2. machines.
744 - When a user schedules a job through the afe they choose a specific
745 host for it. Jobs with metahost can utilize any host that satisfies
746 the metahost criterion. This means that if we had scheduled 4 before
747 3 there is a good chance that a job which could've used another host,
748 will now use the host assigned to a metahost-less job. Given the
749 availability of machines in pool:suites, this almost guarantees
750 starvation for jobs scheduled through the frontend.
751 - Scheduling 4 before 3 also has its pros however, since a suite
752 has the concept of a time out, whereas users can wait. If we hit the
753 process count on the drone a suite can timeout waiting on the test,
754 but a user job generally has a much longer timeout, and relatively
755 harmless consequences.
756 The current ordering was chosed because it is more likely that we will
757 run out of machines in pool:suites than processes on the drone.
758
759 @returns A list of HQEs ordered according to sort_order.
760 """
761 sort_order = ('afe_jobs.priority DESC, '
762 'ISNULL(host_id), '
763 'ISNULL(meta_host), '
Alex Millerd3614042014-01-13 15:58:18 -0800764 'parent_job_id, '
beeps7d8a1b12013-10-29 17:58:34 -0700765 'job_id')
beeps7d8273b2013-11-06 09:44:34 -0800766 query=('NOT complete AND NOT active AND status="Queued"'
767 'AND NOT aborted')
jamesrenc44ae992010-02-19 00:12:54 +0000768 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000769 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
beeps7d8273b2013-11-06 09:44:34 -0800770 where=query, order_by=sort_order))
mbligh36768f02008-02-22 18:28:33 +0000771
772
showard89f84db2009-03-12 20:39:13 +0000773 def _refresh_pending_queue_entries(self):
774 """
775 Lookup the pending HostQueueEntries and call our HostScheduler
776 refresh() method given that list. Return the list.
777
778 @returns A list of pending HostQueueEntries sorted in priority order.
779 """
showard63a34772008-08-18 19:32:50 +0000780 queue_entries = self._get_pending_queue_entries()
781 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000782 return []
showardb95b1bd2008-08-15 18:11:04 +0000783
showard63a34772008-08-18 19:32:50 +0000784 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000785
showard89f84db2009-03-12 20:39:13 +0000786 return queue_entries
787
788
789 def _schedule_atomic_group(self, queue_entry):
790 """
791 Schedule the given queue_entry on an atomic group of hosts.
792
793 Returns immediately if there are insufficient available hosts.
794
795 Creates new HostQueueEntries based off of queue_entry for the
796 scheduled hosts and starts them all running.
797 """
798 # This is a virtual host queue entry representing an entire
799 # atomic group, find a group and schedule their hosts.
800 group_hosts = self._host_scheduler.find_eligible_atomic_group(
801 queue_entry)
802 if not group_hosts:
803 return
showardcbe6f942009-06-17 19:33:49 +0000804
805 logging.info('Expanding atomic group entry %s with hosts %s',
806 queue_entry,
807 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000808
showard89f84db2009-03-12 20:39:13 +0000809 for assigned_host in group_hosts[1:]:
810 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000811 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000812 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000813 new_hqe.set_host(assigned_host)
814 self._run_queue_entry(new_hqe)
815
816 # The first assigned host uses the original HostQueueEntry
817 queue_entry.set_host(group_hosts[0])
818 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000819
820
showarda9545c02009-12-18 22:44:26 +0000821 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800822 """Schedule a hostless (suite) job.
823
824 @param queue_entry: The queue_entry representing the hostless job.
825 """
showarda9545c02009-12-18 22:44:26 +0000826 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000827 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000828
829
beepscc9fc702013-12-02 12:45:38 -0800830 def _schedule_host_job(self, host, queue_entry):
831 """Schedules a job on the given host.
832
833 1. Assign the host to the hqe, if it isn't already assigned.
834 2. Create a SpecialAgentTask for the hqe.
835 3. Activate the hqe.
836
837 @param queue_entry: The job to schedule.
838 @param host: The host to schedule the job on.
839 """
840 if self.host_has_agent(host):
841 host_agent_task = list(self._host_agents.get(host.id))[0].task
842 subject = 'Host with agents assigned to an HQE'
843 message = ('HQE: %s assigned host %s, but the host has '
844 'agent: %s for queue_entry %s. The HQE '
845 'will remain in a queued state till the '
846 'the host is usable.' %
847 (queue_entry, host.hostname, host_agent_task,
848 host_agent_task.queue_entry))
849 email_manager.manager.enqueue_notify_email(subject, message)
850 queue_entry.set_host(None)
851 queue_entry.update_field('active', False)
852 else:
853 if queue_entry.host_id is None:
854 queue_entry.set_host(host)
855 else:
856 if host.id != queue_entry.host_id:
857 raise rdb_utils.RDBException('The rdb returned host: %s '
858 'but the job:%s was already assigned a host: %s. ' %
859 (host.hostname, queue_entry.job_id,
860 queue_entry.host.hostname))
861 queue_entry.update_field('active', True)
862 self._run_queue_entry(queue_entry)
863
864
showard89f84db2009-03-12 20:39:13 +0000865 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700866 """
867 Find any new HQEs and call schedule_pre_job_tasks for it.
868
869 This involves setting the status of the HQE and creating a row in the
870 db corresponding the the special task, through
871 scheduler_models._queue_special_task. The new db row is then added as
872 an agent to the dispatcher through _schedule_special_tasks and
873 scheduled for execution on the drone through _handle_agents.
874 """
showard89f84db2009-03-12 20:39:13 +0000875 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000876
beepscc9fc702013-12-02 12:45:38 -0800877 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700878 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700879 new_jobs_with_hosts = 0
880 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800881 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700882 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000883
beepscc9fc702013-12-02 12:45:38 -0800884 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000885 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000886 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700887 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000888 else:
beepscc9fc702013-12-02 12:45:38 -0800889 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700890 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700891
beepsb255fc52013-10-13 23:28:54 -0700892 stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800893 if not host_jobs:
894 return
895
896 hosts = rdb_lib.acquire_hosts(self._host_scheduler, host_jobs)
897 for host, queue_entry in zip(hosts, host_jobs):
898 if host:
899 self._schedule_host_job(host, queue_entry)
900 new_jobs_with_hosts = new_jobs_with_hosts + 1
901
beepsb255fc52013-10-13 23:28:54 -0700902 stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
903 stats.Gauge(key).send('new_jobs_without_hosts',
904 new_jobs_need_hosts - new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000905
906
showard8cc058f2009-09-08 16:26:33 +0000907 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700908 """
909 Adds agents to the dispatcher.
910
911 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
912 QueueTask for example, will have a job with a control file, and
913 the agent will have methods that poll, abort and check if the queue
914 task is finished. The dispatcher runs the agent_task, as well as
915 other agents in it's _agents member, through _handle_agents, by
916 calling the Agents tick().
917
918 This method creates an agent for each HQE in one of (starting, running,
919 gathering, parsing, archiving) states, and adds it to the dispatcher so
920 it is handled by _handle_agents.
921 """
showardd1195652009-12-08 22:21:02 +0000922 for agent_task in self._get_queue_entry_agent_tasks():
923 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000924
925
926 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000927 for entry in scheduler_models.HostQueueEntry.fetch(
928 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000929 task = entry.job.schedule_delayed_callback_task(entry)
930 if task:
showardd1195652009-12-08 22:21:02 +0000931 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000932
933
jamesren883492a2010-02-12 00:45:18 +0000934 def _run_queue_entry(self, queue_entry):
Simran Basidef92872012-09-20 13:34:34 -0700935 self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
936 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000937 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000938
939
jadmanski0afbb632008-06-06 21:10:57 +0000940 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700941 """
942 Looks through the afe_host_queue_entries for an aborted entry.
943
944 The aborted bit is set on an HQE in many ways, the most common
945 being when a user requests an abort through the frontend, which
946 results in an rpc from the afe to abort_host_queue_entries.
947 """
jamesrene7c65cb2010-06-08 20:38:10 +0000948 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000949 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700950 where='aborted=1 and complete=0'):
showardf4a2e502009-07-28 20:06:39 +0000951 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800952
953 # The task would have started off with both is_complete and
954 # is_active = False. Aborted tasks are neither active nor complete.
955 # For all currently active tasks this will happen through the agent,
956 # but we need to manually update the special tasks that haven't
957 # started yet, because they don't have agents.
958 models.SpecialTask.objects.filter(is_active=False,
959 queue_entry_id=entry.id).update(is_complete=True)
960
showardd3dc1992009-04-22 21:01:40 +0000961 for agent in self.get_agents_for_entry(entry):
962 agent.abort()
963 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000964 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700965 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000966 for job in jobs_to_stop:
967 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000968
969
beeps8bb1f7d2013-08-05 01:30:09 -0700970 def _find_aborted_special_tasks(self):
971 """
972 Find SpecialTasks that have been marked for abortion.
973
974 Poll the database looking for SpecialTasks that are active
975 and have been marked for abortion, then abort them.
976 """
977
978 # The completed and active bits are very important when it comes
979 # to scheduler correctness. The active bit is set through the prolog
980 # of a special task, and reset through the cleanup method of the
981 # SpecialAgentTask. The cleanup is called both through the abort and
982 # epilog. The complete bit is set in several places, and in general
983 # a hanging job will have is_active=1 is_complete=0, while a special
984 # task which completed will have is_active=0 is_complete=1. To check
985 # aborts we directly check active because the complete bit is set in
986 # several places, including the epilog of agent tasks.
987 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
988 is_aborted=True)
989 for task in aborted_tasks:
990 # There are 2 ways to get the agent associated with a task,
991 # through the host and through the hqe. A special task
992 # always needs a host, but doesn't always need a hqe.
993 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700994 if isinstance(agent.task, agent_task.SpecialAgentTask):
beeps8bb1f7d2013-08-05 01:30:09 -0700995
996 # The epilog preforms critical actions such as
997 # queueing the next SpecialTask, requeuing the
998 # hqe etc, however it doesn't actually kill the
999 # monitor process and set the 'done' bit. Epilogs
1000 # assume that the job failed, and that the monitor
1001 # process has already written an exit code. The
1002 # done bit is a necessary condition for
1003 # _handle_agents to schedule any more special
1004 # tasks against the host, and it must be set
1005 # in addition to is_active, is_complete and success.
1006 agent.task.epilog()
1007 agent.task.abort()
1008
1009
showard324bf812009-01-20 23:23:38 +00001010 def _can_start_agent(self, agent, num_started_this_cycle,
1011 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001012 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +00001013 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +00001014 return True
1015 # don't allow any nonzero-process agents to run after we've reached a
1016 # limit (this avoids starvation of many-process agents)
1017 if have_reached_limit:
1018 return False
1019 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001020 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +00001021 agent.task.owner_username,
1022 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +00001023 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001024 return False
1025 # if a single agent exceeds the per-cycle throttling, still allow it to
1026 # run when it's the first agent in the cycle
1027 if num_started_this_cycle == 0:
1028 return True
1029 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +00001030 if (num_started_this_cycle + agent.task.num_processes >
1031 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001032 return False
1033 return True
1034
1035
jadmanski0afbb632008-06-06 21:10:57 +00001036 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -07001037 """
1038 Handles agents of the dispatcher.
1039
1040 Appropriate Agents are added to the dispatcher through
1041 _schedule_running_host_queue_entries. These agents each
1042 have a task. This method runs the agents task through
1043 agent.tick() leading to:
1044 agent.start
1045 prolog -> AgentTasks prolog
1046 For each queue entry:
1047 sets host status/status to Running
1048 set started_on in afe_host_queue_entries
1049 run -> AgentTasks run
1050 Creates PidfileRunMonitor
1051 Queues the autoserv command line for this AgentTask
1052 via the drone manager. These commands are executed
1053 through the drone managers execute actions.
1054 poll -> AgentTasks/BaseAgentTask poll
1055 checks the monitors exit_code.
1056 Executes epilog if task is finished.
1057 Executes AgentTasks _finish_task
1058 finish_task is usually responsible for setting the status
1059 of the HQE/host, and updating it's active and complete fileds.
1060
1061 agent.is_done
1062 Removed the agent from the dispatchers _agents queue.
1063 Is_done checks the finished bit on the agent, that is
1064 set based on the Agents task. During the agents poll
1065 we check to see if the monitor process has exited in
1066 it's finish method, and set the success member of the
1067 task based on this exit code.
1068 """
jadmanski0afbb632008-06-06 21:10:57 +00001069 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001070 have_reached_limit = False
1071 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -07001072 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +00001073 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001074 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1075 'queue_entry ids:%s' % (agent.host_ids,
1076 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001077 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001078 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001079 have_reached_limit):
1080 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001081 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001082 continue
showardd1195652009-12-08 22:21:02 +00001083 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001084 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001085 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001086 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001087 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -07001088 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001089 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -07001090 logging.info('%d running processes. %d added this cycle.',
1091 _drone_manager.total_running_processes(),
1092 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +00001093
1094
showard29f7cd22009-04-29 21:16:24 +00001095 def _process_recurring_runs(self):
1096 recurring_runs = models.RecurringRun.objects.filter(
1097 start_date__lte=datetime.datetime.now())
1098 for rrun in recurring_runs:
1099 # Create job from template
1100 job = rrun.job
1101 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001102 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001103
1104 host_objects = info['hosts']
1105 one_time_hosts = info['one_time_hosts']
1106 metahost_objects = info['meta_hosts']
1107 dependencies = info['dependencies']
1108 atomic_group = info['atomic_group']
1109
1110 for host in one_time_hosts or []:
1111 this_host = models.Host.create_one_time_host(host.hostname)
1112 host_objects.append(this_host)
1113
1114 try:
1115 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001116 options=options,
showard29f7cd22009-04-29 21:16:24 +00001117 host_objects=host_objects,
1118 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001119 atomic_group=atomic_group)
1120
1121 except Exception, ex:
1122 logging.exception(ex)
1123 #TODO send email
1124
1125 if rrun.loop_count == 1:
1126 rrun.delete()
1127 else:
1128 if rrun.loop_count != 0: # if not infinite loop
1129 # calculate new start_date
1130 difference = datetime.timedelta(seconds=rrun.loop_period)
1131 rrun.start_date = rrun.start_date + difference
1132 rrun.loop_count -= 1
1133 rrun.save()
1134
1135
Simran Basia858a232012-08-21 11:04:37 -07001136SiteDispatcher = utils.import_site_class(
1137 __file__, 'autotest_lib.scheduler.site_monitor_db',
1138 'SiteDispatcher', BaseDispatcher)
1139
1140class Dispatcher(SiteDispatcher):
1141 pass
1142
1143
mbligh36768f02008-02-22 18:28:33 +00001144class Agent(object):
showard77182562009-06-10 00:16:05 +00001145 """
Alex Miller47715eb2013-07-24 03:34:01 -07001146 An agent for use by the Dispatcher class to perform a task. An agent wraps
1147 around an AgentTask mainly to associate the AgentTask with the queue_entry
1148 and host ids.
showard77182562009-06-10 00:16:05 +00001149
1150 The following methods are required on all task objects:
1151 poll() - Called periodically to let the task check its status and
1152 update its internal state. If the task succeeded.
1153 is_done() - Returns True if the task is finished.
1154 abort() - Called when an abort has been requested. The task must
1155 set its aborted attribute to True if it actually aborted.
1156
1157 The following attributes are required on all task objects:
1158 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001159 success - bool, True if this task succeeded.
1160 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1161 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001162 """
1163
1164
showard418785b2009-11-23 20:19:59 +00001165 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001166 """
Alex Miller47715eb2013-07-24 03:34:01 -07001167 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001168 """
showard8cc058f2009-09-08 16:26:33 +00001169 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001170
showard77182562009-06-10 00:16:05 +00001171 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001172 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001173
showard8cc058f2009-09-08 16:26:33 +00001174 self.queue_entry_ids = task.queue_entry_ids
1175 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001176
showard8cc058f2009-09-08 16:26:33 +00001177 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001178 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001179
1180
jadmanski0afbb632008-06-06 21:10:57 +00001181 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001182 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001183 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001184 self.task.poll()
1185 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001186 self.finished = True
showardec113162008-05-08 00:52:49 +00001187
1188
jadmanski0afbb632008-06-06 21:10:57 +00001189 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001190 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001191
1192
showardd3dc1992009-04-22 21:01:40 +00001193 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001194 if self.task:
1195 self.task.abort()
1196 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001197 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001198 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001199
showardd3dc1992009-04-22 21:01:40 +00001200
beeps5e2bb4a2013-10-28 11:26:45 -07001201class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001202 """
1203 Common functionality for QueueTask and HostlessQueueTask
1204 """
1205 def __init__(self, queue_entries):
1206 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001207 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001208 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001209
1210
showard73ec0442009-02-07 02:05:20 +00001211 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001212 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001213
1214
jamesrenc44ae992010-02-19 00:12:54 +00001215 def _write_control_file(self, execution_path):
1216 control_path = _drone_manager.attach_file_to_execution(
1217 execution_path, self.job.control_file)
1218 return control_path
1219
1220
Aviv Keshet308e7362013-05-21 14:43:16 -07001221 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001222 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001223 execution_path = self.queue_entries[0].execution_path()
1224 control_path = self._write_control_file(execution_path)
1225 hostnames = ','.join(entry.host.hostname
1226 for entry in self.queue_entries
1227 if not entry.is_hostless())
1228
1229 execution_tag = self.queue_entries[0].execution_tag()
1230 params = _autoserv_command_line(
1231 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001232 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001233 _drone_manager.absolute_path(control_path)],
1234 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001235 if self.job.is_image_update_job():
1236 params += ['--image', self.job.update_image_path]
1237
jamesrenc44ae992010-02-19 00:12:54 +00001238 return params
showardd1195652009-12-08 22:21:02 +00001239
1240
1241 @property
1242 def num_processes(self):
1243 return len(self.queue_entries)
1244
1245
1246 @property
1247 def owner_username(self):
1248 return self.job.owner
1249
1250
1251 def _working_directory(self):
1252 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001253
1254
jadmanski0afbb632008-06-06 21:10:57 +00001255 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001256 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001257 keyval_dict = self.job.keyval_dict()
1258 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001259 group_name = self.queue_entries[0].get_group_name()
1260 if group_name:
1261 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001262 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001263 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001264 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001265 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001266
1267
showard35162b02009-03-03 02:17:30 +00001268 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001269 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001270 _drone_manager.write_lines_to_file(error_file_path,
1271 [_LOST_PROCESS_ERROR])
1272
1273
showardd3dc1992009-04-22 21:01:40 +00001274 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001275 if not self.monitor:
1276 return
1277
showardd9205182009-04-27 20:09:55 +00001278 self._write_job_finished()
1279
showard35162b02009-03-03 02:17:30 +00001280 if self.monitor.lost_process:
1281 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001282
jadmanskif7fa2cc2008-10-01 14:13:23 +00001283
showardcbd74612008-11-19 21:42:02 +00001284 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001285 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001286 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001287 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001288 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001289
1290
jadmanskif7fa2cc2008-10-01 14:13:23 +00001291 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001292 if not self.monitor or not self.monitor.has_process():
1293 return
1294
jadmanskif7fa2cc2008-10-01 14:13:23 +00001295 # build up sets of all the aborted_by and aborted_on values
1296 aborted_by, aborted_on = set(), set()
1297 for queue_entry in self.queue_entries:
1298 if queue_entry.aborted_by:
1299 aborted_by.add(queue_entry.aborted_by)
1300 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1301 aborted_on.add(t)
1302
1303 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001304 # TODO(showard): this conditional is now obsolete, we just need to leave
1305 # it in temporarily for backwards compatibility over upgrades. delete
1306 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001307 assert len(aborted_by) <= 1
1308 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001309 aborted_by_value = aborted_by.pop()
1310 aborted_on_value = max(aborted_on)
1311 else:
1312 aborted_by_value = 'autotest_system'
1313 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001314
showarda0382352009-02-11 23:36:43 +00001315 self._write_keyval_after_job("aborted_by", aborted_by_value)
1316 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001317
showardcbd74612008-11-19 21:42:02 +00001318 aborted_on_string = str(datetime.datetime.fromtimestamp(
1319 aborted_on_value))
1320 self._write_status_comment('Job aborted by %s on %s' %
1321 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001322
1323
jadmanski0afbb632008-06-06 21:10:57 +00001324 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001325 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001326 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001327 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001328
1329
jadmanski0afbb632008-06-06 21:10:57 +00001330 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001331 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001332 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001333
1334
1335class QueueTask(AbstractQueueTask):
1336 def __init__(self, queue_entries):
1337 super(QueueTask, self).__init__(queue_entries)
1338 self._set_ids(queue_entries=queue_entries)
1339
1340
1341 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001342 self._check_queue_entry_statuses(
1343 self.queue_entries,
1344 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1345 models.HostQueueEntry.Status.RUNNING),
1346 allowed_host_statuses=(models.Host.Status.PENDING,
1347 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001348
1349 super(QueueTask, self).prolog()
1350
1351 for queue_entry in self.queue_entries:
1352 self._write_host_keyvals(queue_entry.host)
1353 queue_entry.host.set_status(models.Host.Status.RUNNING)
1354 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001355
1356
1357 def _finish_task(self):
1358 super(QueueTask, self)._finish_task()
1359
1360 for queue_entry in self.queue_entries:
1361 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001362 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001363
1364
Alex Miller9f01d5d2013-08-08 02:26:01 -07001365 def _command_line(self):
1366 invocation = super(QueueTask, self)._command_line()
1367 return invocation + ['--verify_job_repo_url']
1368
1369
Dan Shi1a189052013-10-28 14:41:35 -07001370class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001371 def __init__(self, queue_entry):
1372 super(HostlessQueueTask, self).__init__([queue_entry])
1373 self.queue_entry_ids = [queue_entry.id]
1374
1375
1376 def prolog(self):
1377 self.queue_entries[0].update_field('execution_subdir', 'hostless')
1378 super(HostlessQueueTask, self).prolog()
1379
1380
mbligh4608b002010-01-05 18:22:35 +00001381 def _finish_task(self):
1382 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001383
1384 # When a job is added to database, its initial status is always
1385 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1386 # status, check if any of them can be started. If scheduler hits some
1387 # limit, e.g., max_hostless_jobs_per_drone, max_jobs_started_per_cycle,
1388 # scheduler will leave these jobs in Starting status. Otherwise, the
1389 # jobs' status will be changed to Running, and an autoserv process will
1390 # be started in drone for each of these jobs.
1391 # If the entry is still in status Starting, the process has not started
1392 # yet. Therefore, there is no need to parse and collect log. Without
1393 # this check, exception will be raised by scheduler as execution_subdir
1394 # for this queue entry does not have a value yet.
1395 hqe = self.queue_entries[0]
1396 if hqe.status != models.HostQueueEntry.Status.STARTING:
1397 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001398
1399
mbligh36768f02008-02-22 18:28:33 +00001400if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001401 main()