blob: ef305b4f581b2e75c3e0aa2ea8091bae22eb69ff [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
Aviv Keshet225bdfe2013-03-05 10:10:08 -08008import datetime, optparse, os, signal
beeps5e2bb4a2013-10-28 11:26:45 -07009import sys, time
Aviv Keshet225bdfe2013-03-05 10:10:08 -080010import logging, gc
showard402934a2009-12-21 22:20:47 +000011
Alex Miller05d7b4c2013-03-04 07:49:38 -080012import common
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000014
15import django.db
16
showard136e6dc2009-06-10 19:38:49 +000017from autotest_lib.client.common_lib import global_config, logging_manager
beeps5e2bb4a2013-10-28 11:26:45 -070018from autotest_lib.client.common_lib import utils
showardb1e51872008-10-07 11:08:18 +000019from autotest_lib.database import database_connection
Alex Miller05d7b4c2013-03-04 07:49:38 -080020from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
beeps5e2bb4a2013-10-28 11:26:45 -070021from autotest_lib.scheduler import agent_task, drone_manager, drones
22from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
23from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
24from autotest_lib.scheduler import postjob_task, scheduler_logging_config
beepscc9fc702013-12-02 12:45:38 -080025from autotest_lib.scheduler import rdb_lib
26from autotest_lib.scheduler import rdb_utils
jamesrenc44ae992010-02-19 00:12:54 +000027from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080028from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070029from autotest_lib.server import autoserv_utils
Fang Deng1d6c2a02013-04-17 15:25:45 -070030from autotest_lib.site_utils.graphite import stats
Alex Miller05d7b4c2013-03-04 07:49:38 -080031
showard549afad2009-08-20 23:33:36 +000032BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
33PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000034
mbligh36768f02008-02-22 18:28:33 +000035RESULTS_DIR = '.'
showard170873e2009-01-07 00:22:26 +000036DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000037AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
38
39if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000040 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000041AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
42AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
43
44if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000045 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000046
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070056
57# These 2 globals are replaced for testing
58_autoserv_directory = autoserv_utils.autoserv_directory
59_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000060_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000061_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000062
Eric Lie0493a42010-11-15 13:05:43 -080063def _parser_path_default(install_dir):
64 return os.path.join(install_dir, 'tko', 'parse')
65_parser_path_func = utils.import_site_function(
66 __file__, 'autotest_lib.scheduler.site_monitor_db',
67 'parser_path', _parser_path_default)
68_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
69
mbligh36768f02008-02-22 18:28:33 +000070
mbligh83c1e9e2009-05-01 23:10:41 +000071def _site_init_monitor_db_dummy():
72 return {}
73
74
jamesren76fcf192010-04-21 20:39:50 +000075def _verify_default_drone_set_exists():
76 if (models.DroneSet.drone_sets_enabled() and
77 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080078 raise host_scheduler.SchedulerError(
79 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000080
81
82def _sanity_check():
83 """Make sure the configs are consistent before starting the scheduler"""
84 _verify_default_drone_set_exists()
85
86
mbligh36768f02008-02-22 18:28:33 +000087def main():
showard27f33872009-04-07 18:20:53 +000088 try:
showard549afad2009-08-20 23:33:36 +000089 try:
90 main_without_exception_handling()
91 except SystemExit:
92 raise
93 except:
94 logging.exception('Exception escaping in monitor_db')
95 raise
96 finally:
97 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000098
99
100def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000101 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000102
showard136e6dc2009-06-10 19:38:49 +0000103 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000104 parser = optparse.OptionParser(usage)
105 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
106 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000107 parser.add_option('--test', help='Indicate that scheduler is under ' +
108 'test and should use dummy autoserv and no parsing',
109 action='store_true')
110 (options, args) = parser.parse_args()
111 if len(args) != 1:
112 parser.print_usage()
113 return
mbligh36768f02008-02-22 18:28:33 +0000114
showard5613c662009-06-08 23:30:33 +0000115 scheduler_enabled = global_config.global_config.get_config_value(
116 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
117
118 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800119 logging.error("Scheduler not enabled, set enable_scheduler to true in "
120 "the global_config's SCHEDULER section to enable it. "
121 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000122 sys.exit(1)
123
jadmanski0afbb632008-06-06 21:10:57 +0000124 global RESULTS_DIR
125 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000126
mbligh83c1e9e2009-05-01 23:10:41 +0000127 site_init = utils.import_site_function(__file__,
128 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
129 _site_init_monitor_db_dummy)
130 site_init()
131
showardcca334f2009-03-12 20:38:34 +0000132 # Change the cwd while running to avoid issues incase we were launched from
133 # somewhere odd (such as a random NFS home directory of the person running
134 # sudo to launch us as the appropriate user).
135 os.chdir(RESULTS_DIR)
136
jamesrenc7d387e2010-08-10 21:48:30 +0000137 # This is helpful for debugging why stuff a scheduler launches is
138 # misbehaving.
139 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000140
jadmanski0afbb632008-06-06 21:10:57 +0000141 if options.test:
142 global _autoserv_path
143 _autoserv_path = 'autoserv_dummy'
144 global _testing_mode
145 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000146
jamesrenc44ae992010-02-19 00:12:54 +0000147 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000148 server.start()
149
jadmanski0afbb632008-06-06 21:10:57 +0000150 try:
jamesrenc44ae992010-02-19 00:12:54 +0000151 initialize()
showardc5afc462009-01-13 00:09:39 +0000152 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000153 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000154
Eric Lia82dc352011-02-23 13:15:52 -0800155 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000156 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000157 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000158 except:
showard170873e2009-01-07 00:22:26 +0000159 email_manager.manager.log_stacktrace(
160 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000161
showard170873e2009-01-07 00:22:26 +0000162 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000163 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000164 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000165 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000166
167
showard136e6dc2009-06-10 19:38:49 +0000168def setup_logging():
169 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
170 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
171 logging_manager.configure_logging(
172 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
173 logfile_name=log_name)
174
175
mbligh36768f02008-02-22 18:28:33 +0000176def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000177 global _shutdown
178 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000179 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000180
181
jamesrenc44ae992010-02-19 00:12:54 +0000182def initialize():
showardb18134f2009-03-20 20:52:18 +0000183 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
184 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000185
showard8de37132009-08-31 18:33:08 +0000186 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000187 logging.critical("monitor_db already running, aborting!")
188 sys.exit(1)
189 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000190
showardb1e51872008-10-07 11:08:18 +0000191 if _testing_mode:
192 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000193 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000194
jadmanski0afbb632008-06-06 21:10:57 +0000195 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
196 global _db
showard170873e2009-01-07 00:22:26 +0000197 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000198 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000199
showardfa8629c2008-11-04 16:51:23 +0000200 # ensure Django connection is in autocommit
201 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000202 # bypass the readonly connection
203 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000204
showardb18134f2009-03-20 20:52:18 +0000205 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000206 signal.signal(signal.SIGINT, handle_sigint)
207
jamesrenc44ae992010-02-19 00:12:54 +0000208 initialize_globals()
209 scheduler_models.initialize()
210
showardd1ee1dd2009-01-07 21:33:08 +0000211 drones = global_config.global_config.get_config_value(
212 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
213 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000214 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000215 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000216 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
217
showardb18134f2009-03-20 20:52:18 +0000218 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000219
220
jamesrenc44ae992010-02-19 00:12:54 +0000221def initialize_globals():
222 global _drone_manager
223 _drone_manager = drone_manager.instance()
224
225
showarded2afea2009-07-07 20:54:07 +0000226def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
227 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000228 """
229 @returns The autoserv command line as a list of executable + parameters.
230
231 @param machines - string - A machine or comma separated list of machines
232 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000233 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700234 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
235 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000236 @param queue_entry - A HostQueueEntry object - If supplied and no Job
237 object was supplied, this will be used to lookup the Job object.
238 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700239 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
240 machines, results_directory=drone_manager.WORKING_DIRECTORY,
241 extra_args=extra_args, job=job, queue_entry=queue_entry,
242 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000243
244
Simran Basia858a232012-08-21 11:04:37 -0700245class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800246
247
jadmanski0afbb632008-06-06 21:10:57 +0000248 def __init__(self):
249 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000250 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800251 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000252 user_cleanup_time = scheduler_config.config.clean_interval
253 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
254 _db, user_cleanup_time)
255 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000256 self._host_agents = {}
257 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000258 self._tick_count = 0
259 self._last_garbage_stats_time = time.time()
260 self._seconds_between_garbage_stats = 60 * (
261 global_config.global_config.get_config_value(
262 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700263 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700264 self._tick_debug = global_config.global_config.get_config_value(
265 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
266 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700267 self._extra_debugging = global_config.global_config.get_config_value(
268 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
269 default=False)
mbligh36768f02008-02-22 18:28:33 +0000270
mbligh36768f02008-02-22 18:28:33 +0000271
showard915958d2009-04-22 21:00:58 +0000272 def initialize(self, recover_hosts=True):
273 self._periodic_cleanup.initialize()
274 self._24hr_upkeep.initialize()
275
jadmanski0afbb632008-06-06 21:10:57 +0000276 # always recover processes
277 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000278
jadmanski0afbb632008-06-06 21:10:57 +0000279 if recover_hosts:
280 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000281
jamesrenc44ae992010-02-19 00:12:54 +0000282 self._host_scheduler.recovery_on_startup()
283
mbligh36768f02008-02-22 18:28:33 +0000284
Simran Basi0ec94dd2012-08-28 09:50:10 -0700285 def _log_tick_msg(self, msg):
286 if self._tick_debug:
287 logging.debug(msg)
288
289
Simran Basidef92872012-09-20 13:34:34 -0700290 def _log_extra_msg(self, msg):
291 if self._extra_debugging:
292 logging.debug(msg)
293
294
jadmanski0afbb632008-06-06 21:10:57 +0000295 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700296 """
297 This is an altered version of tick() where we keep track of when each
298 major step begins so we can try to figure out where we are using most
299 of the tick time.
300 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700301 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700302 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000303 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700304 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000305 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700306 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000307 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700308 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000309 self._find_aborting()
beeps8bb1f7d2013-08-05 01:30:09 -0700310 self._log_tick_msg('Calling _find_aborted_special_tasks().')
311 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700312 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000313 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700314 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000315 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700316 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000317 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700318 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000319 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700320 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000321 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700322 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000323 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700324 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000325 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700326 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000327 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700328 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700329 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700330 with timer.get_client('email_manager_send_queued_emails'):
331 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700332 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700333 with timer.get_client('django_db_reset_queries'):
334 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000335 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000336
showard97aed502008-11-04 02:01:24 +0000337
mblighf3294cc2009-04-08 21:17:38 +0000338 def _run_cleanup(self):
339 self._periodic_cleanup.run_cleanup_maybe()
340 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000341
mbligh36768f02008-02-22 18:28:33 +0000342
showardf13a9e22009-12-18 22:54:09 +0000343 def _garbage_collection(self):
344 threshold_time = time.time() - self._seconds_between_garbage_stats
345 if threshold_time < self._last_garbage_stats_time:
346 # Don't generate these reports very often.
347 return
348
349 self._last_garbage_stats_time = time.time()
350 # Force a full level 0 collection (because we can, it doesn't hurt
351 # at this interval).
352 gc.collect()
353 logging.info('Logging garbage collector stats on tick %d.',
354 self._tick_count)
355 gc_stats._log_garbage_collector_stats()
356
357
showard170873e2009-01-07 00:22:26 +0000358 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
359 for object_id in object_ids:
360 agent_dict.setdefault(object_id, set()).add(agent)
361
362
363 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
364 for object_id in object_ids:
365 assert object_id in agent_dict
366 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700367 # If an ID has no more active agent associated, there is no need to
368 # keep it in the dictionary. Otherwise, scheduler will keep an
369 # unnecessarily big dictionary until being restarted.
370 if not agent_dict[object_id]:
371 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000372
373
showardd1195652009-12-08 22:21:02 +0000374 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700375 """
376 Creates and adds an agent to the dispatchers list.
377
378 In creating the agent we also pass on all the queue_entry_ids and
379 host_ids from the special agent task. For every agent we create, we
380 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
381 against the host_ids given to it. So theoritically, a host can have any
382 number of agents associated with it, and each of them can have any
383 special agent task, though in practice we never see > 1 agent/task per
384 host at any time.
385
386 @param agent_task: A SpecialTask for the agent to manage.
387 """
showardd1195652009-12-08 22:21:02 +0000388 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000389 self._agents.append(agent)
390 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000391 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
392 self._register_agent_for_ids(self._queue_entry_agents,
393 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000394
showard170873e2009-01-07 00:22:26 +0000395
396 def get_agents_for_entry(self, queue_entry):
397 """
398 Find agents corresponding to the specified queue_entry.
399 """
showardd3dc1992009-04-22 21:01:40 +0000400 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000401
402
403 def host_has_agent(self, host):
404 """
405 Determine if there is currently an Agent present using this host.
406 """
407 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000408
409
jadmanski0afbb632008-06-06 21:10:57 +0000410 def remove_agent(self, agent):
411 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000412 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
413 agent)
414 self._unregister_agent_for_ids(self._queue_entry_agents,
415 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000416
417
showard8cc058f2009-09-08 16:26:33 +0000418 def _host_has_scheduled_special_task(self, host):
419 return bool(models.SpecialTask.objects.filter(host__id=host.id,
420 is_active=False,
421 is_complete=False))
422
423
jadmanski0afbb632008-06-06 21:10:57 +0000424 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000425 agent_tasks = self._create_recovery_agent_tasks()
426 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000427 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000428 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000429 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000430 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000431 self._reverify_remaining_hosts()
432 # reinitialize drones after killing orphaned processes, since they can
433 # leave around files when they die
434 _drone_manager.execute_actions()
435 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000436
showard170873e2009-01-07 00:22:26 +0000437
showardd1195652009-12-08 22:21:02 +0000438 def _create_recovery_agent_tasks(self):
439 return (self._get_queue_entry_agent_tasks()
440 + self._get_special_task_agent_tasks(is_active=True))
441
442
443 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700444 """
445 Get agent tasks for all hqe in the specified states.
446
447 Loosely this translates to taking a hqe in one of the specified states,
448 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
449 through _get_agent_task_for_queue_entry. Each queue entry can only have
450 one agent task at a time, but there might be multiple queue entries in
451 the group.
452
453 @return: A list of AgentTasks.
454 """
showardd1195652009-12-08 22:21:02 +0000455 # host queue entry statuses handled directly by AgentTasks (Verifying is
456 # handled through SpecialTasks, so is not listed here)
457 statuses = (models.HostQueueEntry.Status.STARTING,
458 models.HostQueueEntry.Status.RUNNING,
459 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000460 models.HostQueueEntry.Status.PARSING,
461 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000462 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000463 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000464 where='status IN (%s)' % status_list)
Alex Miller47cd2472013-11-25 15:20:04 -0800465 stats.Gauge('scheduler.jobs_per_tick').send(
466 'running', len(queue_entries))
showardd1195652009-12-08 22:21:02 +0000467
468 agent_tasks = []
469 used_queue_entries = set()
470 for entry in queue_entries:
471 if self.get_agents_for_entry(entry):
472 # already being handled
473 continue
474 if entry in used_queue_entries:
475 # already picked up by a synchronous job
476 continue
477 agent_task = self._get_agent_task_for_queue_entry(entry)
478 agent_tasks.append(agent_task)
479 used_queue_entries.update(agent_task.queue_entries)
480 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000481
482
showardd1195652009-12-08 22:21:02 +0000483 def _get_special_task_agent_tasks(self, is_active=False):
484 special_tasks = models.SpecialTask.objects.filter(
485 is_active=is_active, is_complete=False)
486 return [self._get_agent_task_for_special_task(task)
487 for task in special_tasks]
488
489
490 def _get_agent_task_for_queue_entry(self, queue_entry):
491 """
beeps8bb1f7d2013-08-05 01:30:09 -0700492 Construct an AgentTask instance for the given active HostQueueEntry.
493
showardd1195652009-12-08 22:21:02 +0000494 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700495 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000496 """
497 task_entries = queue_entry.job.get_group_entries(queue_entry)
498 self._check_for_duplicate_host_entries(task_entries)
499
500 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
501 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000502 if queue_entry.is_hostless():
503 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000504 return QueueTask(queue_entries=task_entries)
505 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700506 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000507 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700508 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000509 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700510 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000511
Dale Curtisaa513362011-03-01 17:27:44 -0800512 raise host_scheduler.SchedulerError(
513 '_get_agent_task_for_queue_entry got entry with '
514 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000515
516
517 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000518 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
519 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000520 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000521 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000522 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000523 if using_host:
showardd1195652009-12-08 22:21:02 +0000524 self._assert_host_has_no_agent(task_entry)
525
526
527 def _assert_host_has_no_agent(self, entry):
528 """
529 @param entry: a HostQueueEntry or a SpecialTask
530 """
531 if self.host_has_agent(entry.host):
532 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800533 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000534 'While scheduling %s, host %s already has a host agent %s'
535 % (entry, entry.host, agent.task))
536
537
538 def _get_agent_task_for_special_task(self, special_task):
539 """
540 Construct an AgentTask class to run the given SpecialTask and add it
541 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700542
543 A special task is create through schedule_special_tasks, but only if
544 the host doesn't already have an agent. This happens through
545 add_agent_task. All special agent tasks are given a host on creation,
546 and a Null hqe. To create a SpecialAgentTask object, you need a
547 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
548 object contains a hqe it's passed on to the special agent task, which
549 creates a HostQueueEntry and saves it as it's queue_entry.
550
showardd1195652009-12-08 22:21:02 +0000551 @param special_task: a models.SpecialTask instance
552 @returns an AgentTask to run this SpecialTask
553 """
554 self._assert_host_has_no_agent(special_task)
555
beeps5e2bb4a2013-10-28 11:26:45 -0700556 special_agent_task_classes = (prejob_task.CleanupTask,
557 prejob_task.VerifyTask,
558 prejob_task.RepairTask,
559 prejob_task.ResetTask,
560 prejob_task.ProvisionTask)
561
showardd1195652009-12-08 22:21:02 +0000562 for agent_task_class in special_agent_task_classes:
563 if agent_task_class.TASK_TYPE == special_task.task:
564 return agent_task_class(task=special_task)
565
Dale Curtisaa513362011-03-01 17:27:44 -0800566 raise host_scheduler.SchedulerError(
567 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000568
569
570 def _register_pidfiles(self, agent_tasks):
571 for agent_task in agent_tasks:
572 agent_task.register_necessary_pidfiles()
573
574
575 def _recover_tasks(self, agent_tasks):
576 orphans = _drone_manager.get_orphaned_autoserv_processes()
577
578 for agent_task in agent_tasks:
579 agent_task.recover()
580 if agent_task.monitor and agent_task.monitor.has_process():
581 orphans.discard(agent_task.monitor.get_process())
582 self.add_agent_task(agent_task)
583
584 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000585
586
showard8cc058f2009-09-08 16:26:33 +0000587 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000588 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
589 % status):
showard0db3d432009-10-12 20:29:15 +0000590 if entry.status == status and not self.get_agents_for_entry(entry):
591 # The status can change during iteration, e.g., if job.run()
592 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000593 yield entry
594
595
showard6878e8b2009-07-20 22:37:45 +0000596 def _check_for_remaining_orphan_processes(self, orphans):
597 if not orphans:
598 return
599 subject = 'Unrecovered orphan autoserv processes remain'
600 message = '\n'.join(str(process) for process in orphans)
601 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000602
603 die_on_orphans = global_config.global_config.get_config_value(
604 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
605
606 if die_on_orphans:
607 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000608
showard170873e2009-01-07 00:22:26 +0000609
showard8cc058f2009-09-08 16:26:33 +0000610 def _recover_pending_entries(self):
611 for entry in self._get_unassigned_entries(
612 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000613 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000614 entry.on_pending()
615
616
showardb8900452009-10-12 20:31:01 +0000617 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000618 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000619 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
620 unrecovered_hqes = []
621 for queue_entry in queue_entries:
622 special_tasks = models.SpecialTask.objects.filter(
623 task__in=(models.SpecialTask.Task.CLEANUP,
624 models.SpecialTask.Task.VERIFY),
625 queue_entry__id=queue_entry.id,
626 is_complete=False)
627 if special_tasks.count() == 0:
628 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000629
showardb8900452009-10-12 20:31:01 +0000630 if unrecovered_hqes:
631 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800632 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000633 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000634 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000635
636
showard65db3932009-10-28 19:54:35 +0000637 def _get_prioritized_special_tasks(self):
638 """
639 Returns all queued SpecialTasks prioritized for repair first, then
640 cleanup, then verify.
beeps8bb1f7d2013-08-05 01:30:09 -0700641
642 @return: list of afe.models.SpecialTasks sorted according to priority.
showard65db3932009-10-28 19:54:35 +0000643 """
644 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
645 is_complete=False,
646 host__locked=False)
647 # exclude hosts with active queue entries unless the SpecialTask is for
648 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000649 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000650 queued_tasks, 'afe_host_queue_entries', 'host_id',
651 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000652 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000653 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000654 where=['(afe_host_queue_entries.id IS NULL OR '
655 'afe_host_queue_entries.id = '
656 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000657
showard65db3932009-10-28 19:54:35 +0000658 # reorder tasks by priority
659 task_priority_order = [models.SpecialTask.Task.REPAIR,
660 models.SpecialTask.Task.CLEANUP,
Dan Shi07e09af2013-04-12 09:31:29 -0700661 models.SpecialTask.Task.VERIFY,
Alex Millerdfff2fd2013-05-28 13:05:06 -0700662 models.SpecialTask.Task.RESET,
663 models.SpecialTask.Task.PROVISION]
showard65db3932009-10-28 19:54:35 +0000664 def task_priority_key(task):
665 return task_priority_order.index(task.task)
666 return sorted(queued_tasks, key=task_priority_key)
667
668
showard65db3932009-10-28 19:54:35 +0000669 def _schedule_special_tasks(self):
670 """
671 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700672
673 Special tasks include PreJobTasks like verify, reset and cleanup.
674 They are created through _schedule_new_jobs and associated with a hqe
675 This method translates SpecialTasks to the appropriate AgentTask and
676 adds them to the dispatchers agents list, so _handle_agents can execute
677 them.
showard65db3932009-10-28 19:54:35 +0000678 """
679 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000680 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000681 continue
showardd1195652009-12-08 22:21:02 +0000682 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000683
684
showard170873e2009-01-07 00:22:26 +0000685 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000686 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000687 # should never happen
showarded2afea2009-07-07 20:54:07 +0000688 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000689 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000690 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700691 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000692 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000693
694
jadmanski0afbb632008-06-06 21:10:57 +0000695 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000696 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700697 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000698 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000699 if self.host_has_agent(host):
700 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000701 continue
showard8cc058f2009-09-08 16:26:33 +0000702 if self._host_has_scheduled_special_task(host):
703 # host will have a special task scheduled on the next cycle
704 continue
showard170873e2009-01-07 00:22:26 +0000705 if print_message:
showardb18134f2009-03-20 20:52:18 +0000706 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000707 models.SpecialTask.objects.create(
708 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000709 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000710
711
jadmanski0afbb632008-06-06 21:10:57 +0000712 def _recover_hosts(self):
713 # recover "Repair Failed" hosts
714 message = 'Reverifying dead host %s'
715 self._reverify_hosts_where("status = 'Repair Failed'",
716 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000717
718
showard04c82c52008-05-29 19:38:12 +0000719
showardb95b1bd2008-08-15 18:11:04 +0000720 def _get_pending_queue_entries(self):
beeps7d8a1b12013-10-29 17:58:34 -0700721 """
722 Fetch a list of new host queue entries.
723
724 The ordering of this list is important, as every new agent
725 we schedule can potentially contribute to the process count
726 on the drone, which has a static limit. The sort order
727 prioritizes jobs as follows:
728 1. High priority jobs: Based on the afe_job's priority
729 2. With hosts and metahosts: This will only happen if we don't
730 activate the hqe after assigning a host to it in
731 schedule_new_jobs.
732 3. With hosts but without metahosts: When tests are scheduled
733 through the frontend the owner of the job would have chosen
734 a host for it.
735 4. Without hosts but with metahosts: This is the common case of
736 a new test that needs a DUT. We assign a host and set it to
737 active so it shouldn't show up in case 2 on the next tick.
738 5. Without hosts and without metahosts: Hostless suite jobs, that
739 will result in new jobs that fall under category 4.
740
741 A note about the ordering of cases 3 and 4:
742 Prioritizing one case above the other leads to earlier acquisition
743 of the following resources: 1. process slots on the drone 2. machines.
744 - When a user schedules a job through the afe they choose a specific
745 host for it. Jobs with metahost can utilize any host that satisfies
746 the metahost criterion. This means that if we had scheduled 4 before
747 3 there is a good chance that a job which could've used another host,
748 will now use the host assigned to a metahost-less job. Given the
749 availability of machines in pool:suites, this almost guarantees
750 starvation for jobs scheduled through the frontend.
751 - Scheduling 4 before 3 also has its pros however, since a suite
752 has the concept of a time out, whereas users can wait. If we hit the
753 process count on the drone a suite can timeout waiting on the test,
754 but a user job generally has a much longer timeout, and relatively
755 harmless consequences.
756 The current ordering was chosed because it is more likely that we will
757 run out of machines in pool:suites than processes on the drone.
758
759 @returns A list of HQEs ordered according to sort_order.
760 """
761 sort_order = ('afe_jobs.priority DESC, '
762 'ISNULL(host_id), '
763 'ISNULL(meta_host), '
Alex Millerd3614042014-01-13 15:58:18 -0800764 'parent_job_id, '
beeps7d8a1b12013-10-29 17:58:34 -0700765 'job_id')
beeps7d8273b2013-11-06 09:44:34 -0800766 query=('NOT complete AND NOT active AND status="Queued"'
767 'AND NOT aborted')
jamesrenc44ae992010-02-19 00:12:54 +0000768 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000769 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
beeps7d8273b2013-11-06 09:44:34 -0800770 where=query, order_by=sort_order))
mbligh36768f02008-02-22 18:28:33 +0000771
772
showard89f84db2009-03-12 20:39:13 +0000773 def _refresh_pending_queue_entries(self):
774 """
775 Lookup the pending HostQueueEntries and call our HostScheduler
776 refresh() method given that list. Return the list.
777
778 @returns A list of pending HostQueueEntries sorted in priority order.
779 """
showard63a34772008-08-18 19:32:50 +0000780 queue_entries = self._get_pending_queue_entries()
781 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000782 return []
showardb95b1bd2008-08-15 18:11:04 +0000783
showard63a34772008-08-18 19:32:50 +0000784 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000785
showard89f84db2009-03-12 20:39:13 +0000786 return queue_entries
787
788
789 def _schedule_atomic_group(self, queue_entry):
790 """
791 Schedule the given queue_entry on an atomic group of hosts.
792
793 Returns immediately if there are insufficient available hosts.
794
795 Creates new HostQueueEntries based off of queue_entry for the
796 scheduled hosts and starts them all running.
797 """
798 # This is a virtual host queue entry representing an entire
799 # atomic group, find a group and schedule their hosts.
800 group_hosts = self._host_scheduler.find_eligible_atomic_group(
801 queue_entry)
802 if not group_hosts:
803 return
showardcbe6f942009-06-17 19:33:49 +0000804
805 logging.info('Expanding atomic group entry %s with hosts %s',
806 queue_entry,
807 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000808
showard89f84db2009-03-12 20:39:13 +0000809 for assigned_host in group_hosts[1:]:
810 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000811 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000812 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000813 new_hqe.set_host(assigned_host)
814 self._run_queue_entry(new_hqe)
815
816 # The first assigned host uses the original HostQueueEntry
817 queue_entry.set_host(group_hosts[0])
818 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000819
820
showarda9545c02009-12-18 22:44:26 +0000821 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800822 """Schedule a hostless (suite) job.
823
824 @param queue_entry: The queue_entry representing the hostless job.
825 """
showarda9545c02009-12-18 22:44:26 +0000826 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000827 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000828
829
beepscc9fc702013-12-02 12:45:38 -0800830 def _schedule_host_job(self, host, queue_entry):
831 """Schedules a job on the given host.
832
833 1. Assign the host to the hqe, if it isn't already assigned.
834 2. Create a SpecialAgentTask for the hqe.
835 3. Activate the hqe.
836
837 @param queue_entry: The job to schedule.
838 @param host: The host to schedule the job on.
839 """
840 if self.host_has_agent(host):
841 host_agent_task = list(self._host_agents.get(host.id))[0].task
842 subject = 'Host with agents assigned to an HQE'
843 message = ('HQE: %s assigned host %s, but the host has '
844 'agent: %s for queue_entry %s. The HQE '
beepsf7d35162014-02-13 16:42:13 -0800845 'will have to try and acquire a host next tick ' %
beepscc9fc702013-12-02 12:45:38 -0800846 (queue_entry, host.hostname, host_agent_task,
847 host_agent_task.queue_entry))
848 email_manager.manager.enqueue_notify_email(subject, message)
beepscc9fc702013-12-02 12:45:38 -0800849 else:
850 if queue_entry.host_id is None:
851 queue_entry.set_host(host)
852 else:
853 if host.id != queue_entry.host_id:
854 raise rdb_utils.RDBException('The rdb returned host: %s '
855 'but the job:%s was already assigned a host: %s. ' %
856 (host.hostname, queue_entry.job_id,
857 queue_entry.host.hostname))
858 queue_entry.update_field('active', True)
859 self._run_queue_entry(queue_entry)
860
861
showard89f84db2009-03-12 20:39:13 +0000862 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700863 """
864 Find any new HQEs and call schedule_pre_job_tasks for it.
865
866 This involves setting the status of the HQE and creating a row in the
867 db corresponding the the special task, through
868 scheduler_models._queue_special_task. The new db row is then added as
869 an agent to the dispatcher through _schedule_special_tasks and
870 scheduled for execution on the drone through _handle_agents.
871 """
showard89f84db2009-03-12 20:39:13 +0000872 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000873
beepscc9fc702013-12-02 12:45:38 -0800874 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700875 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700876 new_jobs_with_hosts = 0
877 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800878 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700879 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000880
beepscc9fc702013-12-02 12:45:38 -0800881 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000882 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000883 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700884 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000885 else:
beepscc9fc702013-12-02 12:45:38 -0800886 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700887 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700888
beepsb255fc52013-10-13 23:28:54 -0700889 stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800890 if not host_jobs:
891 return
892
893 hosts = rdb_lib.acquire_hosts(self._host_scheduler, host_jobs)
894 for host, queue_entry in zip(hosts, host_jobs):
895 if host:
896 self._schedule_host_job(host, queue_entry)
897 new_jobs_with_hosts = new_jobs_with_hosts + 1
898
beepsb255fc52013-10-13 23:28:54 -0700899 stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
900 stats.Gauge(key).send('new_jobs_without_hosts',
901 new_jobs_need_hosts - new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000902
903
showard8cc058f2009-09-08 16:26:33 +0000904 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700905 """
906 Adds agents to the dispatcher.
907
908 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
909 QueueTask for example, will have a job with a control file, and
910 the agent will have methods that poll, abort and check if the queue
911 task is finished. The dispatcher runs the agent_task, as well as
912 other agents in it's _agents member, through _handle_agents, by
913 calling the Agents tick().
914
915 This method creates an agent for each HQE in one of (starting, running,
916 gathering, parsing, archiving) states, and adds it to the dispatcher so
917 it is handled by _handle_agents.
918 """
showardd1195652009-12-08 22:21:02 +0000919 for agent_task in self._get_queue_entry_agent_tasks():
920 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000921
922
923 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000924 for entry in scheduler_models.HostQueueEntry.fetch(
925 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000926 task = entry.job.schedule_delayed_callback_task(entry)
927 if task:
showardd1195652009-12-08 22:21:02 +0000928 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000929
930
jamesren883492a2010-02-12 00:45:18 +0000931 def _run_queue_entry(self, queue_entry):
Simran Basidef92872012-09-20 13:34:34 -0700932 self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
933 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000934 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000935
936
jadmanski0afbb632008-06-06 21:10:57 +0000937 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700938 """
939 Looks through the afe_host_queue_entries for an aborted entry.
940
941 The aborted bit is set on an HQE in many ways, the most common
942 being when a user requests an abort through the frontend, which
943 results in an rpc from the afe to abort_host_queue_entries.
944 """
jamesrene7c65cb2010-06-08 20:38:10 +0000945 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000946 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700947 where='aborted=1 and complete=0'):
showardf4a2e502009-07-28 20:06:39 +0000948 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800949
950 # The task would have started off with both is_complete and
951 # is_active = False. Aborted tasks are neither active nor complete.
952 # For all currently active tasks this will happen through the agent,
953 # but we need to manually update the special tasks that haven't
954 # started yet, because they don't have agents.
955 models.SpecialTask.objects.filter(is_active=False,
956 queue_entry_id=entry.id).update(is_complete=True)
957
showardd3dc1992009-04-22 21:01:40 +0000958 for agent in self.get_agents_for_entry(entry):
959 agent.abort()
960 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000961 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700962 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000963 for job in jobs_to_stop:
964 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000965
966
beeps8bb1f7d2013-08-05 01:30:09 -0700967 def _find_aborted_special_tasks(self):
968 """
969 Find SpecialTasks that have been marked for abortion.
970
971 Poll the database looking for SpecialTasks that are active
972 and have been marked for abortion, then abort them.
973 """
974
975 # The completed and active bits are very important when it comes
976 # to scheduler correctness. The active bit is set through the prolog
977 # of a special task, and reset through the cleanup method of the
978 # SpecialAgentTask. The cleanup is called both through the abort and
979 # epilog. The complete bit is set in several places, and in general
980 # a hanging job will have is_active=1 is_complete=0, while a special
981 # task which completed will have is_active=0 is_complete=1. To check
982 # aborts we directly check active because the complete bit is set in
983 # several places, including the epilog of agent tasks.
984 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
985 is_aborted=True)
986 for task in aborted_tasks:
987 # There are 2 ways to get the agent associated with a task,
988 # through the host and through the hqe. A special task
989 # always needs a host, but doesn't always need a hqe.
990 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700991 if isinstance(agent.task, agent_task.SpecialAgentTask):
beeps8bb1f7d2013-08-05 01:30:09 -0700992
993 # The epilog preforms critical actions such as
994 # queueing the next SpecialTask, requeuing the
995 # hqe etc, however it doesn't actually kill the
996 # monitor process and set the 'done' bit. Epilogs
997 # assume that the job failed, and that the monitor
998 # process has already written an exit code. The
999 # done bit is a necessary condition for
1000 # _handle_agents to schedule any more special
1001 # tasks against the host, and it must be set
1002 # in addition to is_active, is_complete and success.
1003 agent.task.epilog()
1004 agent.task.abort()
1005
1006
showard324bf812009-01-20 23:23:38 +00001007 def _can_start_agent(self, agent, num_started_this_cycle,
1008 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001009 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +00001010 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +00001011 return True
1012 # don't allow any nonzero-process agents to run after we've reached a
1013 # limit (this avoids starvation of many-process agents)
1014 if have_reached_limit:
1015 return False
1016 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001017 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +00001018 agent.task.owner_username,
1019 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +00001020 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001021 return False
1022 # if a single agent exceeds the per-cycle throttling, still allow it to
1023 # run when it's the first agent in the cycle
1024 if num_started_this_cycle == 0:
1025 return True
1026 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +00001027 if (num_started_this_cycle + agent.task.num_processes >
1028 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001029 return False
1030 return True
1031
1032
jadmanski0afbb632008-06-06 21:10:57 +00001033 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -07001034 """
1035 Handles agents of the dispatcher.
1036
1037 Appropriate Agents are added to the dispatcher through
1038 _schedule_running_host_queue_entries. These agents each
1039 have a task. This method runs the agents task through
1040 agent.tick() leading to:
1041 agent.start
1042 prolog -> AgentTasks prolog
1043 For each queue entry:
1044 sets host status/status to Running
1045 set started_on in afe_host_queue_entries
1046 run -> AgentTasks run
1047 Creates PidfileRunMonitor
1048 Queues the autoserv command line for this AgentTask
1049 via the drone manager. These commands are executed
1050 through the drone managers execute actions.
1051 poll -> AgentTasks/BaseAgentTask poll
1052 checks the monitors exit_code.
1053 Executes epilog if task is finished.
1054 Executes AgentTasks _finish_task
1055 finish_task is usually responsible for setting the status
1056 of the HQE/host, and updating it's active and complete fileds.
1057
1058 agent.is_done
1059 Removed the agent from the dispatchers _agents queue.
1060 Is_done checks the finished bit on the agent, that is
1061 set based on the Agents task. During the agents poll
1062 we check to see if the monitor process has exited in
1063 it's finish method, and set the success member of the
1064 task based on this exit code.
1065 """
jadmanski0afbb632008-06-06 21:10:57 +00001066 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001067 have_reached_limit = False
1068 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -07001069 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +00001070 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001071 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1072 'queue_entry ids:%s' % (agent.host_ids,
1073 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001074 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001075 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001076 have_reached_limit):
1077 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001078 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001079 continue
showardd1195652009-12-08 22:21:02 +00001080 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001081 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001082 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001083 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001084 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -07001085 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001086 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -07001087 logging.info('%d running processes. %d added this cycle.',
1088 _drone_manager.total_running_processes(),
1089 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +00001090
1091
showard29f7cd22009-04-29 21:16:24 +00001092 def _process_recurring_runs(self):
1093 recurring_runs = models.RecurringRun.objects.filter(
1094 start_date__lte=datetime.datetime.now())
1095 for rrun in recurring_runs:
1096 # Create job from template
1097 job = rrun.job
1098 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001099 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001100
1101 host_objects = info['hosts']
1102 one_time_hosts = info['one_time_hosts']
1103 metahost_objects = info['meta_hosts']
1104 dependencies = info['dependencies']
1105 atomic_group = info['atomic_group']
1106
1107 for host in one_time_hosts or []:
1108 this_host = models.Host.create_one_time_host(host.hostname)
1109 host_objects.append(this_host)
1110
1111 try:
1112 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001113 options=options,
showard29f7cd22009-04-29 21:16:24 +00001114 host_objects=host_objects,
1115 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001116 atomic_group=atomic_group)
1117
1118 except Exception, ex:
1119 logging.exception(ex)
1120 #TODO send email
1121
1122 if rrun.loop_count == 1:
1123 rrun.delete()
1124 else:
1125 if rrun.loop_count != 0: # if not infinite loop
1126 # calculate new start_date
1127 difference = datetime.timedelta(seconds=rrun.loop_period)
1128 rrun.start_date = rrun.start_date + difference
1129 rrun.loop_count -= 1
1130 rrun.save()
1131
1132
Simran Basia858a232012-08-21 11:04:37 -07001133SiteDispatcher = utils.import_site_class(
1134 __file__, 'autotest_lib.scheduler.site_monitor_db',
1135 'SiteDispatcher', BaseDispatcher)
1136
1137class Dispatcher(SiteDispatcher):
1138 pass
1139
1140
mbligh36768f02008-02-22 18:28:33 +00001141class Agent(object):
showard77182562009-06-10 00:16:05 +00001142 """
Alex Miller47715eb2013-07-24 03:34:01 -07001143 An agent for use by the Dispatcher class to perform a task. An agent wraps
1144 around an AgentTask mainly to associate the AgentTask with the queue_entry
1145 and host ids.
showard77182562009-06-10 00:16:05 +00001146
1147 The following methods are required on all task objects:
1148 poll() - Called periodically to let the task check its status and
1149 update its internal state. If the task succeeded.
1150 is_done() - Returns True if the task is finished.
1151 abort() - Called when an abort has been requested. The task must
1152 set its aborted attribute to True if it actually aborted.
1153
1154 The following attributes are required on all task objects:
1155 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001156 success - bool, True if this task succeeded.
1157 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1158 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001159 """
1160
1161
showard418785b2009-11-23 20:19:59 +00001162 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001163 """
Alex Miller47715eb2013-07-24 03:34:01 -07001164 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001165 """
showard8cc058f2009-09-08 16:26:33 +00001166 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001167
showard77182562009-06-10 00:16:05 +00001168 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001169 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001170
showard8cc058f2009-09-08 16:26:33 +00001171 self.queue_entry_ids = task.queue_entry_ids
1172 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001173
showard8cc058f2009-09-08 16:26:33 +00001174 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001175 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001176
1177
jadmanski0afbb632008-06-06 21:10:57 +00001178 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001179 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001180 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001181 self.task.poll()
1182 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001183 self.finished = True
showardec113162008-05-08 00:52:49 +00001184
1185
jadmanski0afbb632008-06-06 21:10:57 +00001186 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001187 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001188
1189
showardd3dc1992009-04-22 21:01:40 +00001190 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001191 if self.task:
1192 self.task.abort()
1193 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001194 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001195 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001196
showardd3dc1992009-04-22 21:01:40 +00001197
beeps5e2bb4a2013-10-28 11:26:45 -07001198class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001199 """
1200 Common functionality for QueueTask and HostlessQueueTask
1201 """
1202 def __init__(self, queue_entries):
1203 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001204 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001205 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001206
1207
showard73ec0442009-02-07 02:05:20 +00001208 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001209 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001210
1211
jamesrenc44ae992010-02-19 00:12:54 +00001212 def _write_control_file(self, execution_path):
1213 control_path = _drone_manager.attach_file_to_execution(
1214 execution_path, self.job.control_file)
1215 return control_path
1216
1217
Aviv Keshet308e7362013-05-21 14:43:16 -07001218 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001219 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001220 execution_path = self.queue_entries[0].execution_path()
1221 control_path = self._write_control_file(execution_path)
1222 hostnames = ','.join(entry.host.hostname
1223 for entry in self.queue_entries
1224 if not entry.is_hostless())
1225
1226 execution_tag = self.queue_entries[0].execution_tag()
1227 params = _autoserv_command_line(
1228 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001229 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001230 _drone_manager.absolute_path(control_path)],
1231 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001232 if self.job.is_image_update_job():
1233 params += ['--image', self.job.update_image_path]
1234
jamesrenc44ae992010-02-19 00:12:54 +00001235 return params
showardd1195652009-12-08 22:21:02 +00001236
1237
1238 @property
1239 def num_processes(self):
1240 return len(self.queue_entries)
1241
1242
1243 @property
1244 def owner_username(self):
1245 return self.job.owner
1246
1247
1248 def _working_directory(self):
1249 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001250
1251
jadmanski0afbb632008-06-06 21:10:57 +00001252 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001253 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001254 keyval_dict = self.job.keyval_dict()
1255 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001256 group_name = self.queue_entries[0].get_group_name()
1257 if group_name:
1258 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001259 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001260 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001261 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001262 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001263
1264
showard35162b02009-03-03 02:17:30 +00001265 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001266 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001267 _drone_manager.write_lines_to_file(error_file_path,
1268 [_LOST_PROCESS_ERROR])
1269
1270
showardd3dc1992009-04-22 21:01:40 +00001271 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001272 if not self.monitor:
1273 return
1274
showardd9205182009-04-27 20:09:55 +00001275 self._write_job_finished()
1276
showard35162b02009-03-03 02:17:30 +00001277 if self.monitor.lost_process:
1278 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001279
jadmanskif7fa2cc2008-10-01 14:13:23 +00001280
showardcbd74612008-11-19 21:42:02 +00001281 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001282 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001283 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001284 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001285 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001286
1287
jadmanskif7fa2cc2008-10-01 14:13:23 +00001288 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001289 if not self.monitor or not self.monitor.has_process():
1290 return
1291
jadmanskif7fa2cc2008-10-01 14:13:23 +00001292 # build up sets of all the aborted_by and aborted_on values
1293 aborted_by, aborted_on = set(), set()
1294 for queue_entry in self.queue_entries:
1295 if queue_entry.aborted_by:
1296 aborted_by.add(queue_entry.aborted_by)
1297 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1298 aborted_on.add(t)
1299
1300 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001301 # TODO(showard): this conditional is now obsolete, we just need to leave
1302 # it in temporarily for backwards compatibility over upgrades. delete
1303 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001304 assert len(aborted_by) <= 1
1305 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001306 aborted_by_value = aborted_by.pop()
1307 aborted_on_value = max(aborted_on)
1308 else:
1309 aborted_by_value = 'autotest_system'
1310 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001311
showarda0382352009-02-11 23:36:43 +00001312 self._write_keyval_after_job("aborted_by", aborted_by_value)
1313 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001314
showardcbd74612008-11-19 21:42:02 +00001315 aborted_on_string = str(datetime.datetime.fromtimestamp(
1316 aborted_on_value))
1317 self._write_status_comment('Job aborted by %s on %s' %
1318 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001319
1320
jadmanski0afbb632008-06-06 21:10:57 +00001321 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001322 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001323 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001324 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001325
1326
jadmanski0afbb632008-06-06 21:10:57 +00001327 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001328 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001329 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001330
1331
1332class QueueTask(AbstractQueueTask):
1333 def __init__(self, queue_entries):
1334 super(QueueTask, self).__init__(queue_entries)
1335 self._set_ids(queue_entries=queue_entries)
1336
1337
1338 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001339 self._check_queue_entry_statuses(
1340 self.queue_entries,
1341 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1342 models.HostQueueEntry.Status.RUNNING),
1343 allowed_host_statuses=(models.Host.Status.PENDING,
1344 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001345
1346 super(QueueTask, self).prolog()
1347
1348 for queue_entry in self.queue_entries:
1349 self._write_host_keyvals(queue_entry.host)
1350 queue_entry.host.set_status(models.Host.Status.RUNNING)
1351 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001352
1353
1354 def _finish_task(self):
1355 super(QueueTask, self)._finish_task()
1356
1357 for queue_entry in self.queue_entries:
1358 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001359 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001360
1361
Alex Miller9f01d5d2013-08-08 02:26:01 -07001362 def _command_line(self):
1363 invocation = super(QueueTask, self)._command_line()
1364 return invocation + ['--verify_job_repo_url']
1365
1366
Dan Shi1a189052013-10-28 14:41:35 -07001367class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001368 def __init__(self, queue_entry):
1369 super(HostlessQueueTask, self).__init__([queue_entry])
1370 self.queue_entry_ids = [queue_entry.id]
1371
1372
1373 def prolog(self):
1374 self.queue_entries[0].update_field('execution_subdir', 'hostless')
1375 super(HostlessQueueTask, self).prolog()
1376
1377
mbligh4608b002010-01-05 18:22:35 +00001378 def _finish_task(self):
1379 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001380
1381 # When a job is added to database, its initial status is always
1382 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1383 # status, check if any of them can be started. If scheduler hits some
1384 # limit, e.g., max_hostless_jobs_per_drone, max_jobs_started_per_cycle,
1385 # scheduler will leave these jobs in Starting status. Otherwise, the
1386 # jobs' status will be changed to Running, and an autoserv process will
1387 # be started in drone for each of these jobs.
1388 # If the entry is still in status Starting, the process has not started
1389 # yet. Therefore, there is no need to parse and collect log. Without
1390 # this check, exception will be raised by scheduler as execution_subdir
1391 # for this queue entry does not have a value yet.
1392 hqe = self.queue_entries[0]
1393 if hqe.status != models.HostQueueEntry.Status.STARTING:
1394 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001395
1396
mbligh36768f02008-02-22 18:28:33 +00001397if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001398 main()