blob: b9924a6c05bc7559dd6f2a158942e156ca69a1c0 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
mbligh36768f02008-02-22 18:28:33 +00008
Aviv Keshet225bdfe2013-03-05 10:10:08 -08009import datetime, optparse, os, signal
10import sys, time, traceback, urllib
11import logging, gc
showard402934a2009-12-21 22:20:47 +000012
Alex Miller05d7b4c2013-03-04 07:49:38 -080013import common
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000015
16import django.db
17
showard136e6dc2009-06-10 19:38:49 +000018from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000019from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000020from autotest_lib.database import database_connection
jamesrendd855242010-03-02 22:23:44 +000021from autotest_lib.frontend.afe import model_attributes
Alex Miller05d7b4c2013-03-04 07:49:38 -080022from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000023from autotest_lib.scheduler import drone_manager, drones, email_manager
Dale Curtisaa513362011-03-01 17:27:44 -080024from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
Alex Miller05d7b4c2013-03-04 07:49:38 -080025from autotest_lib.scheduler import scheduler_logging_config
jamesrenc44ae992010-02-19 00:12:54 +000026from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080027from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070028from autotest_lib.server import autoserv_utils
Alex Millerdfff2fd2013-05-28 13:05:06 -070029from autotest_lib.server.cros import provision
Fang Deng1d6c2a02013-04-17 15:25:45 -070030from autotest_lib.site_utils.graphite import stats
Alex Miller05d7b4c2013-03-04 07:49:38 -080031
showard549afad2009-08-20 23:33:36 +000032BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
33PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000034
mbligh36768f02008-02-22 18:28:33 +000035RESULTS_DIR = '.'
36AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000037DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000038AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
39
40if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000041 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000042AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
43AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
44
45if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000046 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000047
showard35162b02009-03-03 02:17:30 +000048# error message to leave in results dir when an autoserv process disappears
49# mysteriously
50_LOST_PROCESS_ERROR = """\
51Autoserv failed abnormally during execution for this job, probably due to a
52system error on the Autotest server. Full results may not be available. Sorry.
53"""
54
mbligh6f8bab42008-02-29 22:45:14 +000055_db = None
mbligh36768f02008-02-22 18:28:33 +000056_shutdown = False
Aviv Keshet308e7362013-05-21 14:43:16 -070057_autoserv_directory = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server')
58_autoserv_path = os.path.join(_autoserv_directory, 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000059_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000060_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000061
Eric Lie0493a42010-11-15 13:05:43 -080062def _parser_path_default(install_dir):
63 return os.path.join(install_dir, 'tko', 'parse')
64_parser_path_func = utils.import_site_function(
65 __file__, 'autotest_lib.scheduler.site_monitor_db',
66 'parser_path', _parser_path_default)
67_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
68
mbligh36768f02008-02-22 18:28:33 +000069
showardec6a3b92009-09-25 20:29:13 +000070def _get_pidfile_timeout_secs():
71 """@returns How long to wait for autoserv to write pidfile."""
72 pidfile_timeout_mins = global_config.global_config.get_config_value(
73 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
74 return pidfile_timeout_mins * 60
75
76
mbligh83c1e9e2009-05-01 23:10:41 +000077def _site_init_monitor_db_dummy():
78 return {}
79
80
jamesren76fcf192010-04-21 20:39:50 +000081def _verify_default_drone_set_exists():
82 if (models.DroneSet.drone_sets_enabled() and
83 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080084 raise host_scheduler.SchedulerError(
85 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000086
87
88def _sanity_check():
89 """Make sure the configs are consistent before starting the scheduler"""
90 _verify_default_drone_set_exists()
91
92
mbligh36768f02008-02-22 18:28:33 +000093def main():
showard27f33872009-04-07 18:20:53 +000094 try:
showard549afad2009-08-20 23:33:36 +000095 try:
96 main_without_exception_handling()
97 except SystemExit:
98 raise
99 except:
100 logging.exception('Exception escaping in monitor_db')
101 raise
102 finally:
103 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000104
105
106def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000107 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000108
showard136e6dc2009-06-10 19:38:49 +0000109 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000110 parser = optparse.OptionParser(usage)
111 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
112 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000113 parser.add_option('--test', help='Indicate that scheduler is under ' +
114 'test and should use dummy autoserv and no parsing',
115 action='store_true')
116 (options, args) = parser.parse_args()
117 if len(args) != 1:
118 parser.print_usage()
119 return
mbligh36768f02008-02-22 18:28:33 +0000120
showard5613c662009-06-08 23:30:33 +0000121 scheduler_enabled = global_config.global_config.get_config_value(
122 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
123
124 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800125 logging.error("Scheduler not enabled, set enable_scheduler to true in "
126 "the global_config's SCHEDULER section to enable it. "
127 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000128 sys.exit(1)
129
jadmanski0afbb632008-06-06 21:10:57 +0000130 global RESULTS_DIR
131 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000132
mbligh83c1e9e2009-05-01 23:10:41 +0000133 site_init = utils.import_site_function(__file__,
134 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
135 _site_init_monitor_db_dummy)
136 site_init()
137
showardcca334f2009-03-12 20:38:34 +0000138 # Change the cwd while running to avoid issues incase we were launched from
139 # somewhere odd (such as a random NFS home directory of the person running
140 # sudo to launch us as the appropriate user).
141 os.chdir(RESULTS_DIR)
142
jamesrenc7d387e2010-08-10 21:48:30 +0000143 # This is helpful for debugging why stuff a scheduler launches is
144 # misbehaving.
145 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000146
jadmanski0afbb632008-06-06 21:10:57 +0000147 if options.test:
148 global _autoserv_path
149 _autoserv_path = 'autoserv_dummy'
150 global _testing_mode
151 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000152
jamesrenc44ae992010-02-19 00:12:54 +0000153 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000154 server.start()
155
jadmanski0afbb632008-06-06 21:10:57 +0000156 try:
jamesrenc44ae992010-02-19 00:12:54 +0000157 initialize()
showardc5afc462009-01-13 00:09:39 +0000158 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000159 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000160
Eric Lia82dc352011-02-23 13:15:52 -0800161 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000162 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000163 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000164 except:
showard170873e2009-01-07 00:22:26 +0000165 email_manager.manager.log_stacktrace(
166 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000167
showard170873e2009-01-07 00:22:26 +0000168 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000169 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000170 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000171 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000172
173
showard136e6dc2009-06-10 19:38:49 +0000174def setup_logging():
175 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
176 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
177 logging_manager.configure_logging(
178 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
179 logfile_name=log_name)
180
181
mbligh36768f02008-02-22 18:28:33 +0000182def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000183 global _shutdown
184 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000185 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000186
187
jamesrenc44ae992010-02-19 00:12:54 +0000188def initialize():
showardb18134f2009-03-20 20:52:18 +0000189 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
190 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000191
showard8de37132009-08-31 18:33:08 +0000192 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000193 logging.critical("monitor_db already running, aborting!")
194 sys.exit(1)
195 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000196
showardb1e51872008-10-07 11:08:18 +0000197 if _testing_mode:
198 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000199 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000200
jadmanski0afbb632008-06-06 21:10:57 +0000201 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
202 global _db
showard170873e2009-01-07 00:22:26 +0000203 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000204 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000205
showardfa8629c2008-11-04 16:51:23 +0000206 # ensure Django connection is in autocommit
207 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000208 # bypass the readonly connection
209 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000210
showardb18134f2009-03-20 20:52:18 +0000211 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000212 signal.signal(signal.SIGINT, handle_sigint)
213
jamesrenc44ae992010-02-19 00:12:54 +0000214 initialize_globals()
215 scheduler_models.initialize()
216
showardd1ee1dd2009-01-07 21:33:08 +0000217 drones = global_config.global_config.get_config_value(
218 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
219 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000220 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000221 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000222 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
223
showardb18134f2009-03-20 20:52:18 +0000224 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000225
226
jamesrenc44ae992010-02-19 00:12:54 +0000227def initialize_globals():
228 global _drone_manager
229 _drone_manager = drone_manager.instance()
230
231
showarded2afea2009-07-07 20:54:07 +0000232def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
233 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000234 """
235 @returns The autoserv command line as a list of executable + parameters.
236
237 @param machines - string - A machine or comma separated list of machines
238 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000239 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700240 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
241 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000242 @param queue_entry - A HostQueueEntry object - If supplied and no Job
243 object was supplied, this will be used to lookup the Job object.
244 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700245 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
246 machines, results_directory=drone_manager.WORKING_DIRECTORY,
247 extra_args=extra_args, job=job, queue_entry=queue_entry,
248 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000249
250
Simran Basia858a232012-08-21 11:04:37 -0700251class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800252
253
jadmanski0afbb632008-06-06 21:10:57 +0000254 def __init__(self):
255 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000256 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800257 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000258 user_cleanup_time = scheduler_config.config.clean_interval
259 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
260 _db, user_cleanup_time)
261 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000262 self._host_agents = {}
263 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000264 self._tick_count = 0
265 self._last_garbage_stats_time = time.time()
266 self._seconds_between_garbage_stats = 60 * (
267 global_config.global_config.get_config_value(
268 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700269 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700270 self._tick_debug = global_config.global_config.get_config_value(
271 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
272 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700273 self._extra_debugging = global_config.global_config.get_config_value(
274 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
275 default=False)
mbligh36768f02008-02-22 18:28:33 +0000276
mbligh36768f02008-02-22 18:28:33 +0000277
showard915958d2009-04-22 21:00:58 +0000278 def initialize(self, recover_hosts=True):
279 self._periodic_cleanup.initialize()
280 self._24hr_upkeep.initialize()
281
jadmanski0afbb632008-06-06 21:10:57 +0000282 # always recover processes
283 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000284
jadmanski0afbb632008-06-06 21:10:57 +0000285 if recover_hosts:
286 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000287
jamesrenc44ae992010-02-19 00:12:54 +0000288 self._host_scheduler.recovery_on_startup()
289
mbligh36768f02008-02-22 18:28:33 +0000290
Simran Basi0ec94dd2012-08-28 09:50:10 -0700291 def _log_tick_msg(self, msg):
292 if self._tick_debug:
293 logging.debug(msg)
294
295
Simran Basidef92872012-09-20 13:34:34 -0700296 def _log_extra_msg(self, msg):
297 if self._extra_debugging:
298 logging.debug(msg)
299
300
jadmanski0afbb632008-06-06 21:10:57 +0000301 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700302 """
303 This is an altered version of tick() where we keep track of when each
304 major step begins so we can try to figure out where we are using most
305 of the tick time.
306 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700307 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700308 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000309 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700310 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000311 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700312 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000313 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700314 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000315 self._find_aborting()
beeps8bb1f7d2013-08-05 01:30:09 -0700316 self._log_tick_msg('Calling _find_aborted_special_tasks().')
317 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700318 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000319 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700320 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000321 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700322 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000323 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700324 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000325 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700326 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000327 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700328 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000329 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700330 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000331 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700332 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000333 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700334 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700335 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700336 with timer.get_client('email_manager_send_queued_emails'):
337 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700338 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700339 with timer.get_client('django_db_reset_queries'):
340 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000341 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000342
showard97aed502008-11-04 02:01:24 +0000343
mblighf3294cc2009-04-08 21:17:38 +0000344 def _run_cleanup(self):
345 self._periodic_cleanup.run_cleanup_maybe()
346 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000347
mbligh36768f02008-02-22 18:28:33 +0000348
showardf13a9e22009-12-18 22:54:09 +0000349 def _garbage_collection(self):
350 threshold_time = time.time() - self._seconds_between_garbage_stats
351 if threshold_time < self._last_garbage_stats_time:
352 # Don't generate these reports very often.
353 return
354
355 self._last_garbage_stats_time = time.time()
356 # Force a full level 0 collection (because we can, it doesn't hurt
357 # at this interval).
358 gc.collect()
359 logging.info('Logging garbage collector stats on tick %d.',
360 self._tick_count)
361 gc_stats._log_garbage_collector_stats()
362
363
showard170873e2009-01-07 00:22:26 +0000364 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
365 for object_id in object_ids:
366 agent_dict.setdefault(object_id, set()).add(agent)
367
368
369 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
370 for object_id in object_ids:
371 assert object_id in agent_dict
372 agent_dict[object_id].remove(agent)
373
374
showardd1195652009-12-08 22:21:02 +0000375 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700376 """
377 Creates and adds an agent to the dispatchers list.
378
379 In creating the agent we also pass on all the queue_entry_ids and
380 host_ids from the special agent task. For every agent we create, we
381 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
382 against the host_ids given to it. So theoritically, a host can have any
383 number of agents associated with it, and each of them can have any
384 special agent task, though in practice we never see > 1 agent/task per
385 host at any time.
386
387 @param agent_task: A SpecialTask for the agent to manage.
388 """
showardd1195652009-12-08 22:21:02 +0000389 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000390 self._agents.append(agent)
391 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000392 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
393 self._register_agent_for_ids(self._queue_entry_agents,
394 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000395
showard170873e2009-01-07 00:22:26 +0000396
397 def get_agents_for_entry(self, queue_entry):
398 """
399 Find agents corresponding to the specified queue_entry.
400 """
showardd3dc1992009-04-22 21:01:40 +0000401 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000402
403
404 def host_has_agent(self, host):
405 """
406 Determine if there is currently an Agent present using this host.
407 """
408 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000409
410
jadmanski0afbb632008-06-06 21:10:57 +0000411 def remove_agent(self, agent):
412 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000413 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
414 agent)
415 self._unregister_agent_for_ids(self._queue_entry_agents,
416 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000417
418
showard8cc058f2009-09-08 16:26:33 +0000419 def _host_has_scheduled_special_task(self, host):
420 return bool(models.SpecialTask.objects.filter(host__id=host.id,
421 is_active=False,
422 is_complete=False))
423
424
jadmanski0afbb632008-06-06 21:10:57 +0000425 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000426 agent_tasks = self._create_recovery_agent_tasks()
427 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000428 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000429 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000430 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000431 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000432 self._reverify_remaining_hosts()
433 # reinitialize drones after killing orphaned processes, since they can
434 # leave around files when they die
435 _drone_manager.execute_actions()
436 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000437
showard170873e2009-01-07 00:22:26 +0000438
showardd1195652009-12-08 22:21:02 +0000439 def _create_recovery_agent_tasks(self):
440 return (self._get_queue_entry_agent_tasks()
441 + self._get_special_task_agent_tasks(is_active=True))
442
443
444 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700445 """
446 Get agent tasks for all hqe in the specified states.
447
448 Loosely this translates to taking a hqe in one of the specified states,
449 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
450 through _get_agent_task_for_queue_entry. Each queue entry can only have
451 one agent task at a time, but there might be multiple queue entries in
452 the group.
453
454 @return: A list of AgentTasks.
455 """
showardd1195652009-12-08 22:21:02 +0000456 # host queue entry statuses handled directly by AgentTasks (Verifying is
457 # handled through SpecialTasks, so is not listed here)
458 statuses = (models.HostQueueEntry.Status.STARTING,
459 models.HostQueueEntry.Status.RUNNING,
460 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000461 models.HostQueueEntry.Status.PARSING,
462 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000463 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000464 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000465 where='status IN (%s)' % status_list)
466
467 agent_tasks = []
468 used_queue_entries = set()
469 for entry in queue_entries:
470 if self.get_agents_for_entry(entry):
471 # already being handled
472 continue
473 if entry in used_queue_entries:
474 # already picked up by a synchronous job
475 continue
476 agent_task = self._get_agent_task_for_queue_entry(entry)
477 agent_tasks.append(agent_task)
478 used_queue_entries.update(agent_task.queue_entries)
479 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000480
481
showardd1195652009-12-08 22:21:02 +0000482 def _get_special_task_agent_tasks(self, is_active=False):
483 special_tasks = models.SpecialTask.objects.filter(
484 is_active=is_active, is_complete=False)
485 return [self._get_agent_task_for_special_task(task)
486 for task in special_tasks]
487
488
489 def _get_agent_task_for_queue_entry(self, queue_entry):
490 """
beeps8bb1f7d2013-08-05 01:30:09 -0700491 Construct an AgentTask instance for the given active HostQueueEntry.
492
showardd1195652009-12-08 22:21:02 +0000493 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700494 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000495 """
496 task_entries = queue_entry.job.get_group_entries(queue_entry)
497 self._check_for_duplicate_host_entries(task_entries)
498
499 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
500 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000501 if queue_entry.is_hostless():
502 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000503 return QueueTask(queue_entries=task_entries)
504 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
505 return GatherLogsTask(queue_entries=task_entries)
506 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
507 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000508 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
509 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000510
Dale Curtisaa513362011-03-01 17:27:44 -0800511 raise host_scheduler.SchedulerError(
512 '_get_agent_task_for_queue_entry got entry with '
513 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000514
515
516 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000517 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
518 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000519 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000520 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000521 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000522 if using_host:
showardd1195652009-12-08 22:21:02 +0000523 self._assert_host_has_no_agent(task_entry)
524
525
526 def _assert_host_has_no_agent(self, entry):
527 """
528 @param entry: a HostQueueEntry or a SpecialTask
529 """
530 if self.host_has_agent(entry.host):
531 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800532 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000533 'While scheduling %s, host %s already has a host agent %s'
534 % (entry, entry.host, agent.task))
535
536
537 def _get_agent_task_for_special_task(self, special_task):
538 """
539 Construct an AgentTask class to run the given SpecialTask and add it
540 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700541
542 A special task is create through schedule_special_tasks, but only if
543 the host doesn't already have an agent. This happens through
544 add_agent_task. All special agent tasks are given a host on creation,
545 and a Null hqe. To create a SpecialAgentTask object, you need a
546 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
547 object contains a hqe it's passed on to the special agent task, which
548 creates a HostQueueEntry and saves it as it's queue_entry.
549
showardd1195652009-12-08 22:21:02 +0000550 @param special_task: a models.SpecialTask instance
551 @returns an AgentTask to run this SpecialTask
552 """
553 self._assert_host_has_no_agent(special_task)
554
Dan Shi07e09af2013-04-12 09:31:29 -0700555 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask,
Alex Millerdfff2fd2013-05-28 13:05:06 -0700556 ResetTask, ProvisionTask)
showardd1195652009-12-08 22:21:02 +0000557 for agent_task_class in special_agent_task_classes:
558 if agent_task_class.TASK_TYPE == special_task.task:
559 return agent_task_class(task=special_task)
560
Dale Curtisaa513362011-03-01 17:27:44 -0800561 raise host_scheduler.SchedulerError(
562 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000563
564
565 def _register_pidfiles(self, agent_tasks):
566 for agent_task in agent_tasks:
567 agent_task.register_necessary_pidfiles()
568
569
570 def _recover_tasks(self, agent_tasks):
571 orphans = _drone_manager.get_orphaned_autoserv_processes()
572
573 for agent_task in agent_tasks:
574 agent_task.recover()
575 if agent_task.monitor and agent_task.monitor.has_process():
576 orphans.discard(agent_task.monitor.get_process())
577 self.add_agent_task(agent_task)
578
579 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000580
581
showard8cc058f2009-09-08 16:26:33 +0000582 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000583 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
584 % status):
showard0db3d432009-10-12 20:29:15 +0000585 if entry.status == status and not self.get_agents_for_entry(entry):
586 # The status can change during iteration, e.g., if job.run()
587 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000588 yield entry
589
590
showard6878e8b2009-07-20 22:37:45 +0000591 def _check_for_remaining_orphan_processes(self, orphans):
592 if not orphans:
593 return
594 subject = 'Unrecovered orphan autoserv processes remain'
595 message = '\n'.join(str(process) for process in orphans)
596 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000597
598 die_on_orphans = global_config.global_config.get_config_value(
599 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
600
601 if die_on_orphans:
602 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000603
showard170873e2009-01-07 00:22:26 +0000604
showard8cc058f2009-09-08 16:26:33 +0000605 def _recover_pending_entries(self):
606 for entry in self._get_unassigned_entries(
607 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000608 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000609 entry.on_pending()
610
611
showardb8900452009-10-12 20:31:01 +0000612 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000613 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000614 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
615 unrecovered_hqes = []
616 for queue_entry in queue_entries:
617 special_tasks = models.SpecialTask.objects.filter(
618 task__in=(models.SpecialTask.Task.CLEANUP,
619 models.SpecialTask.Task.VERIFY),
620 queue_entry__id=queue_entry.id,
621 is_complete=False)
622 if special_tasks.count() == 0:
623 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000624
showardb8900452009-10-12 20:31:01 +0000625 if unrecovered_hqes:
626 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800627 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000628 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000629 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000630
631
showard65db3932009-10-28 19:54:35 +0000632 def _get_prioritized_special_tasks(self):
633 """
634 Returns all queued SpecialTasks prioritized for repair first, then
635 cleanup, then verify.
beeps8bb1f7d2013-08-05 01:30:09 -0700636
637 @return: list of afe.models.SpecialTasks sorted according to priority.
showard65db3932009-10-28 19:54:35 +0000638 """
639 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
640 is_complete=False,
641 host__locked=False)
642 # exclude hosts with active queue entries unless the SpecialTask is for
643 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000644 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000645 queued_tasks, 'afe_host_queue_entries', 'host_id',
646 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000647 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000648 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000649 where=['(afe_host_queue_entries.id IS NULL OR '
650 'afe_host_queue_entries.id = '
651 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000652
showard65db3932009-10-28 19:54:35 +0000653 # reorder tasks by priority
654 task_priority_order = [models.SpecialTask.Task.REPAIR,
655 models.SpecialTask.Task.CLEANUP,
Dan Shi07e09af2013-04-12 09:31:29 -0700656 models.SpecialTask.Task.VERIFY,
Alex Millerdfff2fd2013-05-28 13:05:06 -0700657 models.SpecialTask.Task.RESET,
658 models.SpecialTask.Task.PROVISION]
showard65db3932009-10-28 19:54:35 +0000659 def task_priority_key(task):
660 return task_priority_order.index(task.task)
661 return sorted(queued_tasks, key=task_priority_key)
662
663
showard65db3932009-10-28 19:54:35 +0000664 def _schedule_special_tasks(self):
665 """
666 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700667
668 Special tasks include PreJobTasks like verify, reset and cleanup.
669 They are created through _schedule_new_jobs and associated with a hqe
670 This method translates SpecialTasks to the appropriate AgentTask and
671 adds them to the dispatchers agents list, so _handle_agents can execute
672 them.
showard65db3932009-10-28 19:54:35 +0000673 """
674 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000675 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000676 continue
showardd1195652009-12-08 22:21:02 +0000677 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000678
679
showard170873e2009-01-07 00:22:26 +0000680 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000681 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000682 # should never happen
showarded2afea2009-07-07 20:54:07 +0000683 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000684 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000685 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700686 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000687 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000688
689
jadmanski0afbb632008-06-06 21:10:57 +0000690 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000691 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700692 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000693 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000694 if self.host_has_agent(host):
695 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000696 continue
showard8cc058f2009-09-08 16:26:33 +0000697 if self._host_has_scheduled_special_task(host):
698 # host will have a special task scheduled on the next cycle
699 continue
showard170873e2009-01-07 00:22:26 +0000700 if print_message:
showardb18134f2009-03-20 20:52:18 +0000701 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000702 models.SpecialTask.objects.create(
703 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000704 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000705
706
jadmanski0afbb632008-06-06 21:10:57 +0000707 def _recover_hosts(self):
708 # recover "Repair Failed" hosts
709 message = 'Reverifying dead host %s'
710 self._reverify_hosts_where("status = 'Repair Failed'",
711 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000712
713
showard04c82c52008-05-29 19:38:12 +0000714
showardb95b1bd2008-08-15 18:11:04 +0000715 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000716 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +0000717 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000718 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000719 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +0000720 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000721
722
showard89f84db2009-03-12 20:39:13 +0000723 def _refresh_pending_queue_entries(self):
724 """
725 Lookup the pending HostQueueEntries and call our HostScheduler
726 refresh() method given that list. Return the list.
727
728 @returns A list of pending HostQueueEntries sorted in priority order.
729 """
showard63a34772008-08-18 19:32:50 +0000730 queue_entries = self._get_pending_queue_entries()
731 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000732 return []
showardb95b1bd2008-08-15 18:11:04 +0000733
showard63a34772008-08-18 19:32:50 +0000734 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000735
showard89f84db2009-03-12 20:39:13 +0000736 return queue_entries
737
738
739 def _schedule_atomic_group(self, queue_entry):
740 """
741 Schedule the given queue_entry on an atomic group of hosts.
742
743 Returns immediately if there are insufficient available hosts.
744
745 Creates new HostQueueEntries based off of queue_entry for the
746 scheduled hosts and starts them all running.
747 """
748 # This is a virtual host queue entry representing an entire
749 # atomic group, find a group and schedule their hosts.
750 group_hosts = self._host_scheduler.find_eligible_atomic_group(
751 queue_entry)
752 if not group_hosts:
753 return
showardcbe6f942009-06-17 19:33:49 +0000754
755 logging.info('Expanding atomic group entry %s with hosts %s',
756 queue_entry,
757 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000758
showard89f84db2009-03-12 20:39:13 +0000759 for assigned_host in group_hosts[1:]:
760 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000761 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000762 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000763 new_hqe.set_host(assigned_host)
764 self._run_queue_entry(new_hqe)
765
766 # The first assigned host uses the original HostQueueEntry
767 queue_entry.set_host(group_hosts[0])
768 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000769
770
showarda9545c02009-12-18 22:44:26 +0000771 def _schedule_hostless_job(self, queue_entry):
772 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000773 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000774
775
showard89f84db2009-03-12 20:39:13 +0000776 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700777 """
778 Find any new HQEs and call schedule_pre_job_tasks for it.
779
780 This involves setting the status of the HQE and creating a row in the
781 db corresponding the the special task, through
782 scheduler_models._queue_special_task. The new db row is then added as
783 an agent to the dispatcher through _schedule_special_tasks and
784 scheduled for execution on the drone through _handle_agents.
785 """
showard89f84db2009-03-12 20:39:13 +0000786 queue_entries = self._refresh_pending_queue_entries()
787 if not queue_entries:
788 return
789
beepsb255fc52013-10-13 23:28:54 -0700790 new_hostless_jobs = 0
791 new_atomic_groups = 0
792 new_jobs_with_hosts = 0
793 new_jobs_need_hosts = 0
794
Simran Basi3f6717d2012-09-13 15:21:22 -0700795 logging.debug('Processing %d queue_entries', len(queue_entries))
showard63a34772008-08-18 19:32:50 +0000796 for queue_entry in queue_entries:
Simran Basidef92872012-09-20 13:34:34 -0700797 self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
showarde55955f2009-10-07 20:48:58 +0000798 is_unassigned_atomic_group = (
799 queue_entry.atomic_group_id is not None
800 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +0000801
802 if queue_entry.is_hostless():
Simran Basidef92872012-09-20 13:34:34 -0700803 self._log_extra_msg('Scheduling hostless job.')
showarda9545c02009-12-18 22:44:26 +0000804 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700805 new_hostless_jobs = new_hostless_jobs + 1
jamesren883492a2010-02-12 00:45:18 +0000806 elif is_unassigned_atomic_group:
807 self._schedule_atomic_group(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700808 new_atmoic_groups = new_atomic_groups + 1
showarde55955f2009-10-07 20:48:58 +0000809 else:
beepsb255fc52013-10-13 23:28:54 -0700810 new_jobs_need_hosts = new_jobs_need_hosts + 1
jamesren883492a2010-02-12 00:45:18 +0000811 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +0000812 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +0000813 assert assigned_host.id == queue_entry.host_id
814 self._run_queue_entry(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700815 new_jobs_with_hosts = new_jobs_with_hosts + 1
816
817 key = 'scheduler.jobs_per_tick'
818 stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
819 stats.Gauge(key).send('new_atomic_groups', new_atomic_groups)
820 stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
821 stats.Gauge(key).send('new_jobs_without_hosts',
822 new_jobs_need_hosts - new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000823
824
showard8cc058f2009-09-08 16:26:33 +0000825 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700826 """
827 Adds agents to the dispatcher.
828
829 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
830 QueueTask for example, will have a job with a control file, and
831 the agent will have methods that poll, abort and check if the queue
832 task is finished. The dispatcher runs the agent_task, as well as
833 other agents in it's _agents member, through _handle_agents, by
834 calling the Agents tick().
835
836 This method creates an agent for each HQE in one of (starting, running,
837 gathering, parsing, archiving) states, and adds it to the dispatcher so
838 it is handled by _handle_agents.
839 """
showardd1195652009-12-08 22:21:02 +0000840 for agent_task in self._get_queue_entry_agent_tasks():
841 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000842
843
844 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000845 for entry in scheduler_models.HostQueueEntry.fetch(
846 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000847 task = entry.job.schedule_delayed_callback_task(entry)
848 if task:
showardd1195652009-12-08 22:21:02 +0000849 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000850
851
jamesren883492a2010-02-12 00:45:18 +0000852 def _run_queue_entry(self, queue_entry):
Simran Basidef92872012-09-20 13:34:34 -0700853 self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
854 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000855 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000856
857
jadmanski0afbb632008-06-06 21:10:57 +0000858 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700859 """
860 Looks through the afe_host_queue_entries for an aborted entry.
861
862 The aborted bit is set on an HQE in many ways, the most common
863 being when a user requests an abort through the frontend, which
864 results in an rpc from the afe to abort_host_queue_entries.
865 """
jamesrene7c65cb2010-06-08 20:38:10 +0000866 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000867 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700868 where='aborted=1 and complete=0'):
showardf4a2e502009-07-28 20:06:39 +0000869 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +0000870 for agent in self.get_agents_for_entry(entry):
871 agent.abort()
872 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000873 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700874 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000875 for job in jobs_to_stop:
876 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000877
878
beeps8bb1f7d2013-08-05 01:30:09 -0700879 def _find_aborted_special_tasks(self):
880 """
881 Find SpecialTasks that have been marked for abortion.
882
883 Poll the database looking for SpecialTasks that are active
884 and have been marked for abortion, then abort them.
885 """
886
887 # The completed and active bits are very important when it comes
888 # to scheduler correctness. The active bit is set through the prolog
889 # of a special task, and reset through the cleanup method of the
890 # SpecialAgentTask. The cleanup is called both through the abort and
891 # epilog. The complete bit is set in several places, and in general
892 # a hanging job will have is_active=1 is_complete=0, while a special
893 # task which completed will have is_active=0 is_complete=1. To check
894 # aborts we directly check active because the complete bit is set in
895 # several places, including the epilog of agent tasks.
896 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
897 is_aborted=True)
898 for task in aborted_tasks:
899 # There are 2 ways to get the agent associated with a task,
900 # through the host and through the hqe. A special task
901 # always needs a host, but doesn't always need a hqe.
902 for agent in self._host_agents.get(task.host.id, []):
903 if isinstance(agent.task, SpecialAgentTask):
904
905 # The epilog preforms critical actions such as
906 # queueing the next SpecialTask, requeuing the
907 # hqe etc, however it doesn't actually kill the
908 # monitor process and set the 'done' bit. Epilogs
909 # assume that the job failed, and that the monitor
910 # process has already written an exit code. The
911 # done bit is a necessary condition for
912 # _handle_agents to schedule any more special
913 # tasks against the host, and it must be set
914 # in addition to is_active, is_complete and success.
915 agent.task.epilog()
916 agent.task.abort()
917
918
showard324bf812009-01-20 23:23:38 +0000919 def _can_start_agent(self, agent, num_started_this_cycle,
920 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000921 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000922 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000923 return True
924 # don't allow any nonzero-process agents to run after we've reached a
925 # limit (this avoids starvation of many-process agents)
926 if have_reached_limit:
927 return False
928 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000929 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000930 agent.task.owner_username,
931 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000932 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000933 return False
934 # if a single agent exceeds the per-cycle throttling, still allow it to
935 # run when it's the first agent in the cycle
936 if num_started_this_cycle == 0:
937 return True
938 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000939 if (num_started_this_cycle + agent.task.num_processes >
940 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000941 return False
942 return True
943
944
jadmanski0afbb632008-06-06 21:10:57 +0000945 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700946 """
947 Handles agents of the dispatcher.
948
949 Appropriate Agents are added to the dispatcher through
950 _schedule_running_host_queue_entries. These agents each
951 have a task. This method runs the agents task through
952 agent.tick() leading to:
953 agent.start
954 prolog -> AgentTasks prolog
955 For each queue entry:
956 sets host status/status to Running
957 set started_on in afe_host_queue_entries
958 run -> AgentTasks run
959 Creates PidfileRunMonitor
960 Queues the autoserv command line for this AgentTask
961 via the drone manager. These commands are executed
962 through the drone managers execute actions.
963 poll -> AgentTasks/BaseAgentTask poll
964 checks the monitors exit_code.
965 Executes epilog if task is finished.
966 Executes AgentTasks _finish_task
967 finish_task is usually responsible for setting the status
968 of the HQE/host, and updating it's active and complete fileds.
969
970 agent.is_done
971 Removed the agent from the dispatchers _agents queue.
972 Is_done checks the finished bit on the agent, that is
973 set based on the Agents task. During the agents poll
974 we check to see if the monitor process has exited in
975 it's finish method, and set the success member of the
976 task based on this exit code.
977 """
jadmanski0afbb632008-06-06 21:10:57 +0000978 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000979 have_reached_limit = False
980 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700981 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000982 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700983 self._log_extra_msg('Processing Agent with Host Ids: %s and '
984 'queue_entry ids:%s' % (agent.host_ids,
985 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000986 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000987 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000988 have_reached_limit):
989 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700990 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000991 continue
showardd1195652009-12-08 22:21:02 +0000992 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700993 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000994 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700995 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000996 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700997 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000998 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700999 logging.info('%d running processes. %d added this cycle.',
1000 _drone_manager.total_running_processes(),
1001 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +00001002
1003
showard29f7cd22009-04-29 21:16:24 +00001004 def _process_recurring_runs(self):
1005 recurring_runs = models.RecurringRun.objects.filter(
1006 start_date__lte=datetime.datetime.now())
1007 for rrun in recurring_runs:
1008 # Create job from template
1009 job = rrun.job
1010 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001011 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001012
1013 host_objects = info['hosts']
1014 one_time_hosts = info['one_time_hosts']
1015 metahost_objects = info['meta_hosts']
1016 dependencies = info['dependencies']
1017 atomic_group = info['atomic_group']
1018
1019 for host in one_time_hosts or []:
1020 this_host = models.Host.create_one_time_host(host.hostname)
1021 host_objects.append(this_host)
1022
1023 try:
1024 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001025 options=options,
showard29f7cd22009-04-29 21:16:24 +00001026 host_objects=host_objects,
1027 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001028 atomic_group=atomic_group)
1029
1030 except Exception, ex:
1031 logging.exception(ex)
1032 #TODO send email
1033
1034 if rrun.loop_count == 1:
1035 rrun.delete()
1036 else:
1037 if rrun.loop_count != 0: # if not infinite loop
1038 # calculate new start_date
1039 difference = datetime.timedelta(seconds=rrun.loop_period)
1040 rrun.start_date = rrun.start_date + difference
1041 rrun.loop_count -= 1
1042 rrun.save()
1043
1044
Simran Basia858a232012-08-21 11:04:37 -07001045SiteDispatcher = utils.import_site_class(
1046 __file__, 'autotest_lib.scheduler.site_monitor_db',
1047 'SiteDispatcher', BaseDispatcher)
1048
1049class Dispatcher(SiteDispatcher):
1050 pass
1051
1052
showard170873e2009-01-07 00:22:26 +00001053class PidfileRunMonitor(object):
1054 """
1055 Client must call either run() to start a new process or
1056 attach_to_existing_process().
1057 """
mbligh36768f02008-02-22 18:28:33 +00001058
showard170873e2009-01-07 00:22:26 +00001059 class _PidfileException(Exception):
1060 """
1061 Raised when there's some unexpected behavior with the pid file, but only
1062 used internally (never allowed to escape this class).
1063 """
mbligh36768f02008-02-22 18:28:33 +00001064
1065
showard170873e2009-01-07 00:22:26 +00001066 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001067 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001068 self._start_time = None
1069 self.pidfile_id = None
1070 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001071
1072
showard170873e2009-01-07 00:22:26 +00001073 def _add_nice_command(self, command, nice_level):
1074 if not nice_level:
1075 return command
1076 return ['nice', '-n', str(nice_level)] + command
1077
1078
1079 def _set_start_time(self):
1080 self._start_time = time.time()
1081
1082
showard418785b2009-11-23 20:19:59 +00001083 def run(self, command, working_directory, num_processes, nice_level=None,
1084 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +00001085 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +00001086 assert command is not None
1087 if nice_level is not None:
1088 command = ['nice', '-n', str(nice_level)] + command
1089 self._set_start_time()
1090 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001091 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001092 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +00001093 paired_with_pidfile=paired_with_pidfile, username=username,
1094 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +00001095
1096
showarded2afea2009-07-07 20:54:07 +00001097 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +00001098 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +00001099 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001100 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001101 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001102 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001103 if num_processes is not None:
1104 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001105
1106
jadmanski0afbb632008-06-06 21:10:57 +00001107 def kill(self):
showard170873e2009-01-07 00:22:26 +00001108 if self.has_process():
1109 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001110
mbligh36768f02008-02-22 18:28:33 +00001111
showard170873e2009-01-07 00:22:26 +00001112 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001113 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001114 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001115
1116
showard170873e2009-01-07 00:22:26 +00001117 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001118 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001119 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001120 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001121
1122
showard170873e2009-01-07 00:22:26 +00001123 def _read_pidfile(self, use_second_read=False):
1124 assert self.pidfile_id is not None, (
1125 'You must call run() or attach_to_existing_process()')
1126 contents = _drone_manager.get_pidfile_contents(
1127 self.pidfile_id, use_second_read=use_second_read)
1128 if contents.is_invalid():
1129 self._state = drone_manager.PidfileContents()
1130 raise self._PidfileException(contents)
1131 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001132
1133
showard21baa452008-10-21 00:08:39 +00001134 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001135 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1136 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001137 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001138 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001139
1140
1141 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001142 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001143 return
mblighbb421852008-03-11 22:36:16 +00001144
showard21baa452008-10-21 00:08:39 +00001145 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001146
showard170873e2009-01-07 00:22:26 +00001147 if self._state.process is None:
1148 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001149 return
mbligh90a549d2008-03-25 23:52:34 +00001150
showard21baa452008-10-21 00:08:39 +00001151 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001152 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001153 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001154 return
mbligh90a549d2008-03-25 23:52:34 +00001155
showard170873e2009-01-07 00:22:26 +00001156 # pid but no running process - maybe process *just* exited
1157 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001158 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001159 # autoserv exited without writing an exit code
1160 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001161 self._handle_pidfile_error(
1162 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001163
showard21baa452008-10-21 00:08:39 +00001164
1165 def _get_pidfile_info(self):
1166 """\
1167 After completion, self._state will contain:
1168 pid=None, exit_status=None if autoserv has not yet run
1169 pid!=None, exit_status=None if autoserv is running
1170 pid!=None, exit_status!=None if autoserv has completed
1171 """
1172 try:
1173 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001174 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001175 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001176
1177
showard170873e2009-01-07 00:22:26 +00001178 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001179 """\
1180 Called when no pidfile is found or no pid is in the pidfile.
1181 """
showard170873e2009-01-07 00:22:26 +00001182 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001183 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001184 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001185 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001186 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001187
1188
showard35162b02009-03-03 02:17:30 +00001189 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001190 """\
1191 Called when autoserv has exited without writing an exit status,
1192 or we've timed out waiting for autoserv to write a pid to the
1193 pidfile. In either case, we just return failure and the caller
1194 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001195
showard170873e2009-01-07 00:22:26 +00001196 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001197 """
1198 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001199 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001200 self._state.exit_status = 1
1201 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001202
1203
jadmanski0afbb632008-06-06 21:10:57 +00001204 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001205 self._get_pidfile_info()
1206 return self._state.exit_status
1207
1208
1209 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001210 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001211 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001212 if self._state.num_tests_failed is None:
1213 return -1
showard21baa452008-10-21 00:08:39 +00001214 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001215
1216
showardcdaeae82009-08-31 18:32:48 +00001217 def try_copy_results_on_drone(self, **kwargs):
1218 if self.has_process():
1219 # copy results logs into the normal place for job results
1220 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1221
1222
1223 def try_copy_to_results_repository(self, source, **kwargs):
1224 if self.has_process():
1225 _drone_manager.copy_to_results_repository(self.get_process(),
1226 source, **kwargs)
1227
1228
mbligh36768f02008-02-22 18:28:33 +00001229class Agent(object):
showard77182562009-06-10 00:16:05 +00001230 """
Alex Miller47715eb2013-07-24 03:34:01 -07001231 An agent for use by the Dispatcher class to perform a task. An agent wraps
1232 around an AgentTask mainly to associate the AgentTask with the queue_entry
1233 and host ids.
showard77182562009-06-10 00:16:05 +00001234
1235 The following methods are required on all task objects:
1236 poll() - Called periodically to let the task check its status and
1237 update its internal state. If the task succeeded.
1238 is_done() - Returns True if the task is finished.
1239 abort() - Called when an abort has been requested. The task must
1240 set its aborted attribute to True if it actually aborted.
1241
1242 The following attributes are required on all task objects:
1243 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001244 success - bool, True if this task succeeded.
1245 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1246 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001247 """
1248
1249
showard418785b2009-11-23 20:19:59 +00001250 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001251 """
Alex Miller47715eb2013-07-24 03:34:01 -07001252 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001253 """
showard8cc058f2009-09-08 16:26:33 +00001254 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001255
showard77182562009-06-10 00:16:05 +00001256 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001257 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001258
showard8cc058f2009-09-08 16:26:33 +00001259 self.queue_entry_ids = task.queue_entry_ids
1260 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001261
showard8cc058f2009-09-08 16:26:33 +00001262 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001263 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001264
1265
jadmanski0afbb632008-06-06 21:10:57 +00001266 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001267 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001268 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001269 self.task.poll()
1270 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001271 self.finished = True
showardec113162008-05-08 00:52:49 +00001272
1273
jadmanski0afbb632008-06-06 21:10:57 +00001274 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001275 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001276
1277
showardd3dc1992009-04-22 21:01:40 +00001278 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001279 if self.task:
1280 self.task.abort()
1281 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001282 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001283 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001284
showardd3dc1992009-04-22 21:01:40 +00001285
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001286class BaseAgentTask(object):
showardd1195652009-12-08 22:21:02 +00001287 class _NullMonitor(object):
1288 pidfile_id = None
1289
1290 def has_process(self):
1291 return True
1292
1293
1294 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001295 """
showardd1195652009-12-08 22:21:02 +00001296 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001297 """
jadmanski0afbb632008-06-06 21:10:57 +00001298 self.done = False
showardd1195652009-12-08 22:21:02 +00001299 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001300 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001301 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001302 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001303 self.queue_entry_ids = []
1304 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001305 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001306
1307
1308 def _set_ids(self, host=None, queue_entries=None):
1309 if queue_entries and queue_entries != [None]:
1310 self.host_ids = [entry.host.id for entry in queue_entries]
1311 self.queue_entry_ids = [entry.id for entry in queue_entries]
1312 else:
1313 assert host
1314 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001315
1316
jadmanski0afbb632008-06-06 21:10:57 +00001317 def poll(self):
showard08a36412009-05-05 01:01:13 +00001318 if not self.started:
1319 self.start()
showardd1195652009-12-08 22:21:02 +00001320 if not self.done:
1321 self.tick()
showard08a36412009-05-05 01:01:13 +00001322
1323
1324 def tick(self):
showardd1195652009-12-08 22:21:02 +00001325 assert self.monitor
1326 exit_code = self.monitor.exit_code()
1327 if exit_code is None:
1328 return
mbligh36768f02008-02-22 18:28:33 +00001329
showardd1195652009-12-08 22:21:02 +00001330 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001331 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001332
1333
jadmanski0afbb632008-06-06 21:10:57 +00001334 def is_done(self):
1335 return self.done
mbligh36768f02008-02-22 18:28:33 +00001336
1337
jadmanski0afbb632008-06-06 21:10:57 +00001338 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001339 if self.done:
showardd1195652009-12-08 22:21:02 +00001340 assert self.started
showard08a36412009-05-05 01:01:13 +00001341 return
showardd1195652009-12-08 22:21:02 +00001342 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001343 self.done = True
1344 self.success = success
1345 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001346
1347
jadmanski0afbb632008-06-06 21:10:57 +00001348 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001349 """
1350 To be overridden.
1351 """
showarded2afea2009-07-07 20:54:07 +00001352 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001353 self.register_necessary_pidfiles()
1354
1355
1356 def _log_file(self):
1357 if not self._log_file_name:
1358 return None
1359 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001360
mbligh36768f02008-02-22 18:28:33 +00001361
jadmanski0afbb632008-06-06 21:10:57 +00001362 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001363 log_file = self._log_file()
1364 if self.monitor and log_file:
1365 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001366
1367
jadmanski0afbb632008-06-06 21:10:57 +00001368 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001369 """
1370 To be overridden.
1371 """
jadmanski0afbb632008-06-06 21:10:57 +00001372 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001373 logging.info("%s finished with success=%s", type(self).__name__,
1374 self.success)
1375
mbligh36768f02008-02-22 18:28:33 +00001376
1377
jadmanski0afbb632008-06-06 21:10:57 +00001378 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001379 if not self.started:
1380 self.prolog()
1381 self.run()
1382
1383 self.started = True
1384
1385
1386 def abort(self):
1387 if self.monitor:
1388 self.monitor.kill()
1389 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001390 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001391 self.cleanup()
1392
1393
showarded2afea2009-07-07 20:54:07 +00001394 def _get_consistent_execution_path(self, execution_entries):
1395 first_execution_path = execution_entries[0].execution_path()
1396 for execution_entry in execution_entries[1:]:
1397 assert execution_entry.execution_path() == first_execution_path, (
1398 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1399 execution_entry,
1400 first_execution_path,
1401 execution_entries[0]))
1402 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001403
1404
showarded2afea2009-07-07 20:54:07 +00001405 def _copy_results(self, execution_entries, use_monitor=None):
1406 """
1407 @param execution_entries: list of objects with execution_path() method
1408 """
showard6d1c1432009-08-20 23:30:39 +00001409 if use_monitor is not None and not use_monitor.has_process():
1410 return
1411
showarded2afea2009-07-07 20:54:07 +00001412 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001413 if use_monitor is None:
1414 assert self.monitor
1415 use_monitor = self.monitor
1416 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001417 execution_path = self._get_consistent_execution_path(execution_entries)
1418 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001419 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001420
showarda1e74b32009-05-12 17:32:04 +00001421
1422 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001423 for queue_entry in queue_entries:
1424 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001425
1426
mbligh4608b002010-01-05 18:22:35 +00001427 def _archive_results(self, queue_entries):
1428 for queue_entry in queue_entries:
1429 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001430
1431
showardd1195652009-12-08 22:21:02 +00001432 def _command_line(self):
1433 """
1434 Return the command line to run. Must be overridden.
1435 """
1436 raise NotImplementedError
1437
1438
1439 @property
1440 def num_processes(self):
1441 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001442 Return the number of processes forked by this BaseAgentTask's process.
1443 It may only be approximate. To be overridden if necessary.
showardd1195652009-12-08 22:21:02 +00001444 """
1445 return 1
1446
1447
1448 def _paired_with_monitor(self):
1449 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001450 If this BaseAgentTask's process must run on the same machine as some
showardd1195652009-12-08 22:21:02 +00001451 previous process, this method should be overridden to return a
1452 PidfileRunMonitor for that process.
1453 """
1454 return self._NullMonitor()
1455
1456
1457 @property
1458 def owner_username(self):
1459 """
1460 Return login of user responsible for this task. May be None. Must be
1461 overridden.
1462 """
1463 raise NotImplementedError
1464
1465
1466 def _working_directory(self):
1467 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001468 Return the directory where this BaseAgentTask's process executes.
1469 Must be overridden.
showardd1195652009-12-08 22:21:02 +00001470 """
1471 raise NotImplementedError
1472
1473
1474 def _pidfile_name(self):
1475 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001476 Return the name of the pidfile this BaseAgentTask's process uses. To be
showardd1195652009-12-08 22:21:02 +00001477 overridden if necessary.
1478 """
jamesrenc44ae992010-02-19 00:12:54 +00001479 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001480
1481
1482 def _check_paired_results_exist(self):
1483 if not self._paired_with_monitor().has_process():
1484 email_manager.manager.enqueue_notify_email(
1485 'No paired results in task',
1486 'No paired results in task %s at %s'
1487 % (self, self._paired_with_monitor().pidfile_id))
1488 self.finished(False)
1489 return False
1490 return True
1491
1492
1493 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001494 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001495 self.monitor = PidfileRunMonitor()
1496
1497
1498 def run(self):
1499 if not self._check_paired_results_exist():
1500 return
1501
1502 self._create_monitor()
1503 self.monitor.run(
1504 self._command_line(), self._working_directory(),
1505 num_processes=self.num_processes,
1506 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1507 pidfile_name=self._pidfile_name(),
1508 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001509 username=self.owner_username,
1510 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1511
1512
1513 def get_drone_hostnames_allowed(self):
1514 if not models.DroneSet.drone_sets_enabled():
1515 return None
1516
1517 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1518 if not hqes:
1519 # Only special tasks could be missing host queue entries
1520 assert isinstance(self, SpecialAgentTask)
1521 return self._user_or_global_default_drone_set(
1522 self.task, self.task.requested_by)
1523
1524 job_ids = hqes.values_list('job', flat=True).distinct()
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001525 assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
jamesren76fcf192010-04-21 20:39:50 +00001526 "span multiple jobs")
1527
1528 job = models.Job.objects.get(id=job_ids[0])
1529 drone_set = job.drone_set
1530 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001531 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001532
1533 return drone_set.get_drone_hostnames()
1534
1535
1536 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1537 """
1538 Returns the user's default drone set, if present.
1539
1540 Otherwise, returns the global default drone set.
1541 """
1542 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1543 if not user:
1544 logging.warn('%s had no owner; using default drone set',
1545 obj_with_owner)
1546 return default_hostnames
1547 if not user.drone_set:
1548 logging.warn('User %s has no default drone set, using global '
1549 'default', user.login)
1550 return default_hostnames
1551 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001552
1553
1554 def register_necessary_pidfiles(self):
1555 pidfile_id = _drone_manager.get_pidfile_id_from(
1556 self._working_directory(), self._pidfile_name())
1557 _drone_manager.register_pidfile(pidfile_id)
1558
1559 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1560 if paired_pidfile_id:
1561 _drone_manager.register_pidfile(paired_pidfile_id)
1562
1563
1564 def recover(self):
1565 if not self._check_paired_results_exist():
1566 return
1567
1568 self._create_monitor()
1569 self.monitor.attach_to_existing_process(
1570 self._working_directory(), pidfile_name=self._pidfile_name(),
1571 num_processes=self.num_processes)
1572 if not self.monitor.has_process():
1573 # no process to recover; wait to be started normally
1574 self.monitor = None
1575 return
1576
1577 self.started = True
Aviv Keshet225bdfe2013-03-05 10:10:08 -08001578 logging.info('Recovering process %s for %s at %s',
1579 self.monitor.get_process(), type(self).__name__,
1580 self._working_directory())
mbligh36768f02008-02-22 18:28:33 +00001581
1582
mbligh4608b002010-01-05 18:22:35 +00001583 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1584 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001585 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001586 for entry in queue_entries:
1587 if entry.status not in allowed_hqe_statuses:
Dale Curtisaa513362011-03-01 17:27:44 -08001588 raise host_scheduler.SchedulerError(
1589 '%s attempting to start entry with invalid status %s: '
1590 '%s' % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001591 invalid_host_status = (
1592 allowed_host_statuses is not None
1593 and entry.host.status not in allowed_host_statuses)
1594 if invalid_host_status:
Dale Curtisaa513362011-03-01 17:27:44 -08001595 raise host_scheduler.SchedulerError(
1596 '%s attempting to start on queue entry with invalid '
1597 'host status %s: %s'
1598 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001599
1600
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001601SiteAgentTask = utils.import_site_class(
1602 __file__, 'autotest_lib.scheduler.site_monitor_db',
1603 'SiteAgentTask', BaseAgentTask)
1604
1605class AgentTask(SiteAgentTask):
1606 pass
1607
1608
showardd9205182009-04-27 20:09:55 +00001609class TaskWithJobKeyvals(object):
1610 """AgentTask mixin providing functionality to help with job keyval files."""
1611 _KEYVAL_FILE = 'keyval'
1612 def _format_keyval(self, key, value):
1613 return '%s=%s' % (key, value)
1614
1615
1616 def _keyval_path(self):
1617 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001618 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001619
1620
1621 def _write_keyval_after_job(self, field, value):
1622 assert self.monitor
1623 if not self.monitor.has_process():
1624 return
1625 _drone_manager.write_lines_to_file(
1626 self._keyval_path(), [self._format_keyval(field, value)],
1627 paired_with_process=self.monitor.get_process())
1628
1629
1630 def _job_queued_keyval(self, job):
1631 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1632
1633
1634 def _write_job_finished(self):
1635 self._write_keyval_after_job("job_finished", int(time.time()))
1636
1637
showarddb502762009-09-09 15:31:20 +00001638 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1639 keyval_contents = '\n'.join(self._format_keyval(key, value)
1640 for key, value in keyval_dict.iteritems())
1641 # always end with a newline to allow additional keyvals to be written
1642 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001643 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001644 keyval_contents,
1645 file_path=keyval_path)
1646
1647
1648 def _write_keyvals_before_job(self, keyval_dict):
1649 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1650
1651
1652 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001653 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001654 host.hostname)
1655 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001656 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001657 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1658 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1659
1660
Alex Millerc23a7f02013-08-27 17:36:42 -07001661class SelfThrottledTask(AgentTask):
1662 """
1663 Special AgentTask subclass that maintains its own global process limit.
1664 """
1665 _num_running_processes = 0
Fang Deng7bcc0982013-09-05 15:26:57 -07001666 # Last known limit of max processes, used to check whether
1667 # max processes config has been changed.
1668 _last_known_max_processes = 0
1669 # Whether an email should be sent to notifiy process limit being hit.
1670 _notification_on = True
1671 # Once process limit is hit, an email will be sent.
1672 # To prevent spams, do not send another email until
1673 # it drops to lower than the following level.
1674 REVIVE_NOTIFICATION_THRESHOLD = 0.80
Alex Millerc23a7f02013-08-27 17:36:42 -07001675
1676
1677 @classmethod
1678 def _increment_running_processes(cls):
1679 cls._num_running_processes += 1
1680
1681
1682 @classmethod
1683 def _decrement_running_processes(cls):
1684 cls._num_running_processes -= 1
1685
1686
1687 @classmethod
1688 def _max_processes(cls):
1689 raise NotImplementedError
1690
1691
1692 @classmethod
1693 def _can_run_new_process(cls):
1694 return cls._num_running_processes < cls._max_processes()
1695
1696
1697 def _process_started(self):
1698 return bool(self.monitor)
1699
1700
1701 def tick(self):
1702 # override tick to keep trying to start until the process count goes
1703 # down and we can, at which point we revert to default behavior
1704 if self._process_started():
1705 super(SelfThrottledTask, self).tick()
1706 else:
1707 self._try_starting_process()
1708
1709
1710 def run(self):
1711 # override run() to not actually run unless we can
1712 self._try_starting_process()
1713
1714
Fang Deng7bcc0982013-09-05 15:26:57 -07001715 @classmethod
1716 def _notify_process_limit_hit(cls):
1717 """Send an email to notify that process limit is hit."""
1718 if cls._notification_on:
1719 subject = '%s: hitting max process limit.' % cls.__name__
1720 message = ('Running processes/Max processes: %d/%d'
1721 % (cls._num_running_processes, cls._max_processes()))
1722 email_manager.manager.enqueue_notify_email(
1723 subject, message)
1724 cls._notification_on = False
1725
1726
1727 @classmethod
1728 def _reset_notification_switch_if_necessary(cls):
1729 """Reset _notification_on if necessary.
1730
1731 Set _notification_on to True on the following cases:
1732 1) If the limit of max processes configuration changes;
1733 2) If _notification_on is False and the number of running processes
1734 drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD.
1735
1736 """
1737 if cls._last_known_max_processes != cls._max_processes():
1738 cls._notification_on = True
1739 cls._last_known_max_processes = cls._max_processes()
1740 return
1741 percentage = float(cls._num_running_processes) / cls._max_processes()
1742 if (not cls._notification_on and
1743 percentage < cls.REVIVE_NOTIFICATION_THRESHOLD):
1744 cls._notification_on = True
1745
Alex Millerc23a7f02013-08-27 17:36:42 -07001746 def _try_starting_process(self):
Fang Deng7bcc0982013-09-05 15:26:57 -07001747 self._reset_notification_switch_if_necessary()
Alex Millerc23a7f02013-08-27 17:36:42 -07001748 if not self._can_run_new_process():
Fang Deng7bcc0982013-09-05 15:26:57 -07001749 self._notify_process_limit_hit()
Alex Millerc23a7f02013-08-27 17:36:42 -07001750 return
1751
1752 # actually run the command
1753 super(SelfThrottledTask, self).run()
1754 if self._process_started():
1755 self._increment_running_processes()
1756
1757
1758 def finished(self, success):
1759 super(SelfThrottledTask, self).finished(success)
1760 if self._process_started():
1761 self._decrement_running_processes()
1762
1763
1764
showard8cc058f2009-09-08 16:26:33 +00001765class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001766 """
1767 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1768 """
1769
1770 TASK_TYPE = None
1771 host = None
1772 queue_entry = None
1773
showardd1195652009-12-08 22:21:02 +00001774 def __init__(self, task, extra_command_args):
1775 super(SpecialAgentTask, self).__init__()
1776
lmrb7c5d272010-04-16 06:34:04 +00001777 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001778
jamesrenc44ae992010-02-19 00:12:54 +00001779 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001780 self.queue_entry = None
1781 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001782 self.queue_entry = scheduler_models.HostQueueEntry(
1783 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001784
showarded2afea2009-07-07 20:54:07 +00001785 self.task = task
1786 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001787
1788
showard8cc058f2009-09-08 16:26:33 +00001789 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001790 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1791
1792
1793 def _command_line(self):
1794 return _autoserv_command_line(self.host.hostname,
1795 self._extra_command_args,
1796 queue_entry=self.queue_entry)
1797
1798
1799 def _working_directory(self):
1800 return self.task.execution_path()
1801
1802
1803 @property
1804 def owner_username(self):
1805 if self.task.requested_by:
1806 return self.task.requested_by.login
1807 return None
showard8cc058f2009-09-08 16:26:33 +00001808
1809
showarded2afea2009-07-07 20:54:07 +00001810 def prolog(self):
1811 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001812 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001813 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001814
1815
showardde634ee2009-01-30 01:44:24 +00001816 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001817 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001818
showard2fe3f1d2009-07-06 20:19:11 +00001819 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001820 return # don't fail metahost entries, they'll be reassigned
1821
showard2fe3f1d2009-07-06 20:19:11 +00001822 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001823 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001824 return # entry has been aborted
1825
Alex Millerdfff2fd2013-05-28 13:05:06 -07001826 self._actually_fail_queue_entry()
1827
1828
1829 # TODO(milleral): http://crbug.com/268607
1830 # All this used to be a part of _fail_queue_entry. The
1831 # exact semantics of when one should and should not be failing a queue
1832 # entry need to be worked out, because provisioning has placed us in a
1833 # case where we want to fail a queue entry that could be requeued,
1834 # which makes us fail the two above if statements, and thus
1835 # _fail_queue_entry() would exit early and have no effect.
1836 # What's left here with _actually_fail_queue_entry is a hack to be able to
1837 # bypass the checks and unconditionally execute the code.
1838 def _actually_fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001839 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001840 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001841 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001842 self._write_keyval_after_job(queued_key, queued_time)
1843 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001844
showard8cc058f2009-09-08 16:26:33 +00001845 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001846 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001847 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001848 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001849
showard8cc058f2009-09-08 16:26:33 +00001850 pidfile_id = _drone_manager.get_pidfile_id_from(
1851 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001852 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001853 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001854
1855 if self.queue_entry.job.parse_failed_repair:
1856 self._parse_results([self.queue_entry])
1857 else:
1858 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001859
Alex Miller23676a22013-07-03 09:03:36 -07001860 # Also fail all other special tasks that have not yet run for this HQE
1861 pending_tasks = models.SpecialTask.objects.filter(
1862 queue_entry__id=self.queue_entry.id,
1863 is_complete=0)
Alex Miller5e36ccc2013-08-03 16:31:58 -07001864 for task in pending_tasks:
1865 task.finish(False)
Alex Miller23676a22013-07-03 09:03:36 -07001866
showard8cc058f2009-09-08 16:26:33 +00001867
1868 def cleanup(self):
1869 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001870
1871 # We will consider an aborted task to be "Failed"
1872 self.task.finish(bool(self.success))
1873
showardf85a0b72009-10-07 20:48:45 +00001874 if self.monitor:
1875 if self.monitor.has_process():
1876 self._copy_results([self.task])
1877 if self.monitor.pidfile_id is not None:
1878 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001879
1880
Dan Shi07e09af2013-04-12 09:31:29 -07001881 def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
1882 """Remove a type of special task in all tasks, keep last one if needed.
1883
1884 @param special_task_to_remove: type of special task to be removed, e.g.,
1885 models.SpecialTask.Task.VERIFY.
1886 @param keep_last_one: True to keep the last special task if its type is
1887 the same as of special_task_to_remove.
1888
1889 """
1890 queued_special_tasks = models.SpecialTask.objects.filter(
1891 host__id=self.host.id,
1892 task=special_task_to_remove,
1893 is_active=False, is_complete=False, queue_entry=None)
1894 if keep_last_one:
1895 queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
1896 queued_special_tasks.delete()
1897
1898
showard8cc058f2009-09-08 16:26:33 +00001899class RepairTask(SpecialAgentTask):
1900 TASK_TYPE = models.SpecialTask.Task.REPAIR
1901
1902
showardd1195652009-12-08 22:21:02 +00001903 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001904 """\
1905 queue_entry: queue entry to mark failed if this repair fails.
1906 """
1907 protection = host_protections.Protection.get_string(
1908 task.host.protection)
1909 # normalize the protection name
1910 protection = host_protections.Protection.get_attr_name(protection)
1911
1912 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001913 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001914
1915 # *don't* include the queue entry in IDs -- if the queue entry is
1916 # aborted, we want to leave the repair task running
1917 self._set_ids(host=self.host)
1918
1919
1920 def prolog(self):
1921 super(RepairTask, self).prolog()
1922 logging.info("repair_task starting")
1923 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001924
1925
jadmanski0afbb632008-06-06 21:10:57 +00001926 def epilog(self):
1927 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001928
jadmanski0afbb632008-06-06 21:10:57 +00001929 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001930 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001931 else:
showard8cc058f2009-09-08 16:26:33 +00001932 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001933 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001934 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001935
1936
showarded2afea2009-07-07 20:54:07 +00001937class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001938 def _copy_to_results_repository(self):
1939 if not self.queue_entry or self.queue_entry.meta_host:
1940 return
1941
1942 self.queue_entry.set_execution_subdir()
1943 log_name = os.path.basename(self.task.execution_path())
1944 source = os.path.join(self.task.execution_path(), 'debug',
1945 'autoserv.DEBUG')
1946 destination = os.path.join(
1947 self.queue_entry.execution_path(), log_name)
1948
1949 self.monitor.try_copy_to_results_repository(
1950 source, destination_path=destination)
1951
1952
showard170873e2009-01-07 00:22:26 +00001953 def epilog(self):
1954 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001955
showard775300b2009-09-09 15:30:50 +00001956 if self.success:
1957 return
showard8fe93b52008-11-18 17:53:22 +00001958
showard775300b2009-09-09 15:30:50 +00001959 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001960 # effectively ignore failure for these hosts
1961 self.success = True
showard775300b2009-09-09 15:30:50 +00001962 return
1963
1964 if self.queue_entry:
Alex Millerf3f19452013-07-29 15:53:00 -07001965 # If we requeue a HQE, we should cancel any remaining pre-job
1966 # tasks against this host, otherwise we'll be left in a state
1967 # where a queued HQE has special tasks to run against a host.
1968 models.SpecialTask.objects.filter(
1969 queue_entry__id=self.queue_entry.id,
1970 host__id=self.host.id,
1971 is_complete=0).update(is_complete=1, success=0)
showard775300b2009-09-09 15:30:50 +00001972
Alex Millera4a78ef2013-09-03 21:23:05 -07001973 previous_provisions = models.SpecialTask.objects.filter(
1974 task=models.SpecialTask.Task.PROVISION,
1975 queue_entry_id=self.queue_entry.id).count()
Alex Miller7bcec082013-09-19 10:00:53 -07001976 if (previous_provisions >
Alex Millera4a78ef2013-09-03 21:23:05 -07001977 scheduler_config.config.max_provision_retries):
1978 self._actually_fail_queue_entry()
1979 # This abort will mark the aborted bit on the HQE itself, to
1980 # signify that we're killing it. Technically it also will do
1981 # the recursive aborting of all child jobs, but that shouldn't
1982 # matter here, as only suites have children, and those are
1983 # hostless and thus don't have provisioning.
1984 # TODO(milleral) http://crbug.com/188217
1985 # However, we can't actually do this yet, as if we set the
1986 # abort bit the FinalReparseTask will set the status of the HQE
1987 # to ABORTED, which then means that we don't show the status in
1988 # run_suite. So in the meantime, don't mark the HQE as
1989 # aborted.
1990 # queue_entry.abort()
1991 else:
1992 # requeue() must come after handling provision retries, since
1993 # _actually_fail_queue_entry needs an execution subdir.
1994 # We also don't want to requeue if we hit the provision retry
1995 # limit, since then we overwrite the PARSING state of the HQE.
1996 self.queue_entry.requeue()
1997
1998 previous_repairs = models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001999 task=models.SpecialTask.Task.REPAIR,
Alex Millera4a78ef2013-09-03 21:23:05 -07002000 queue_entry_id=self.queue_entry.id).count()
2001 if previous_repairs >= scheduler_config.config.max_repair_limit:
showard775300b2009-09-09 15:30:50 +00002002 self.host.set_status(models.Host.Status.REPAIR_FAILED)
2003 self._fail_queue_entry()
2004 return
2005
showard9bb960b2009-11-19 01:02:11 +00002006 queue_entry = models.HostQueueEntry.objects.get(
2007 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00002008 else:
2009 queue_entry = None
2010
2011 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00002012 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00002013 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00002014 queue_entry=queue_entry,
2015 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00002016
showard8fe93b52008-11-18 17:53:22 +00002017
Alex Miller42437f92013-05-28 12:58:54 -07002018 def _should_pending(self):
2019 """
2020 Decide if we should call the host queue entry's on_pending method.
2021 We should if:
2022 1) There exists an associated host queue entry.
2023 2) The current special task completed successfully.
2024 3) There do not exist any more special tasks to be run before the
2025 host queue entry starts.
2026
2027 @returns: True if we should call pending, false if not.
2028
2029 """
2030 if not self.queue_entry or not self.success:
2031 return False
2032
2033 # We know if this is the last one when we create it, so we could add
2034 # another column to the database to keep track of this information, but
2035 # I expect the overhead of querying here to be minimal.
2036 queue_entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
2037 queued = models.SpecialTask.objects.filter(
2038 host__id=self.host.id, is_active=False,
2039 is_complete=False, queue_entry=queue_entry)
2040 queued = queued.exclude(id=self.task.id)
2041 return queued.count() == 0
2042
2043
showard8fe93b52008-11-18 17:53:22 +00002044class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002045 TASK_TYPE = models.SpecialTask.Task.VERIFY
2046
2047
showardd1195652009-12-08 22:21:02 +00002048 def __init__(self, task):
2049 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00002050 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00002051
2052
jadmanski0afbb632008-06-06 21:10:57 +00002053 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002054 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00002055
showardb18134f2009-03-20 20:52:18 +00002056 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002057 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00002058 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2059 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00002060
jamesren42318f72010-05-10 23:40:59 +00002061 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00002062 # and there's no need to keep records of other requests.
Dan Shi07e09af2013-04-12 09:31:29 -07002063 self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
2064 keep_last_one=True)
showard2fe3f1d2009-07-06 20:19:11 +00002065
mbligh36768f02008-02-22 18:28:33 +00002066
jadmanski0afbb632008-06-06 21:10:57 +00002067 def epilog(self):
2068 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002069 if self.success:
Alex Miller42437f92013-05-28 12:58:54 -07002070 if self._should_pending():
showard8cc058f2009-09-08 16:26:33 +00002071 self.queue_entry.on_pending()
2072 else:
2073 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00002074
2075
mbligh4608b002010-01-05 18:22:35 +00002076class CleanupTask(PreJobTask):
2077 # note this can also run post-job, but when it does, it's running standalone
2078 # against the host (not related to the job), so it's not considered a
2079 # PostJobTask
2080
2081 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2082
2083
2084 def __init__(self, task, recover_run_monitor=None):
2085 super(CleanupTask, self).__init__(task, ['--cleanup'])
2086 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2087
2088
2089 def prolog(self):
2090 super(CleanupTask, self).prolog()
2091 logging.info("starting cleanup task for host: %s", self.host.hostname)
2092 self.host.set_status(models.Host.Status.CLEANING)
2093 if self.queue_entry:
Dan Shi07e09af2013-04-12 09:31:29 -07002094 self.queue_entry.set_status(models.HostQueueEntry.Status.CLEANING)
mbligh4608b002010-01-05 18:22:35 +00002095
2096
2097 def _finish_epilog(self):
2098 if not self.queue_entry or not self.success:
2099 return
2100
2101 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2102 should_run_verify = (
2103 self.queue_entry.job.run_verify
2104 and self.host.protection != do_not_verify_protection)
2105 if should_run_verify:
2106 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
2107 models.SpecialTask.objects.create(
2108 host=models.Host.objects.get(id=self.host.id),
2109 queue_entry=entry,
2110 task=models.SpecialTask.Task.VERIFY)
2111 else:
Alex Miller42437f92013-05-28 12:58:54 -07002112 if self._should_pending():
2113 self.queue_entry.on_pending()
mbligh4608b002010-01-05 18:22:35 +00002114
2115
2116 def epilog(self):
2117 super(CleanupTask, self).epilog()
2118
2119 if self.success:
2120 self.host.update_field('dirty', 0)
2121 self.host.set_status(models.Host.Status.READY)
2122
2123 self._finish_epilog()
2124
2125
Dan Shi07e09af2013-04-12 09:31:29 -07002126class ResetTask(PreJobTask):
2127 """Task to reset a DUT, including cleanup and verify."""
2128 # note this can also run post-job, but when it does, it's running standalone
2129 # against the host (not related to the job), so it's not considered a
2130 # PostJobTask
2131
2132 TASK_TYPE = models.SpecialTask.Task.RESET
2133
2134
2135 def __init__(self, task, recover_run_monitor=None):
2136 super(ResetTask, self).__init__(task, ['--reset'])
2137 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2138
2139
2140 def prolog(self):
2141 super(ResetTask, self).prolog()
2142 logging.info('starting reset task for host: %s',
2143 self.host.hostname)
2144 self.host.set_status(models.Host.Status.RESETTING)
2145 if self.queue_entry:
2146 self.queue_entry.set_status(models.HostQueueEntry.Status.RESETTING)
2147
2148 # Delete any queued cleanups for this host.
2149 self.remove_special_tasks(models.SpecialTask.Task.CLEANUP,
2150 keep_last_one=False)
2151
2152 # Delete any queued reverifies for this host.
2153 self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
2154 keep_last_one=False)
2155
2156 # Only one reset is needed.
2157 self.remove_special_tasks(models.SpecialTask.Task.RESET,
2158 keep_last_one=True)
2159
2160
2161 def epilog(self):
2162 super(ResetTask, self).epilog()
2163
2164 if self.success:
2165 self.host.update_field('dirty', 0)
Dan Shi07e09af2013-04-12 09:31:29 -07002166
Alex Millerba076c52013-07-11 10:11:48 -07002167 if self._should_pending():
Dan Shi07e09af2013-04-12 09:31:29 -07002168 self.queue_entry.on_pending()
Alex Millerdc608d52013-07-30 14:26:21 -07002169 else:
2170 self.host.set_status(models.Host.Status.READY)
Dan Shi07e09af2013-04-12 09:31:29 -07002171
2172
Alex Millerdfff2fd2013-05-28 13:05:06 -07002173class ProvisionTask(PreJobTask):
2174 TASK_TYPE = models.SpecialTask.Task.PROVISION
2175
2176 def __init__(self, task):
2177 # Provisioning requires that we be associated with a job/queue entry
2178 assert task.queue_entry, "No HQE associated with provision task!"
2179 # task.queue_entry is an afe model HostQueueEntry object.
2180 # self.queue_entry is a scheduler models HostQueueEntry object, but
2181 # it gets constructed and assigned in __init__, so it's not available
2182 # yet. Therefore, we're stuck pulling labels off of the afe model
2183 # so that we can pass the --provision args into the __init__ call.
2184 labels = {x.name for x in task.queue_entry.job.dependency_labels.all()}
2185 _, provisionable = provision.filter_labels(labels)
2186 extra_command_args = ['--provision', ','.join(provisionable)]
2187 super(ProvisionTask, self).__init__(task, extra_command_args)
2188 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2189
2190
2191 def _command_line(self):
2192 # If we give queue_entry to _autoserv_command_line, then it will append
2193 # -c for this invocation if the queue_entry is a client side test. We
2194 # don't want that, as it messes with provisioning, so we just drop it
2195 # from the arguments here.
2196 # Note that we also don't verify job_repo_url as provisioining tasks are
2197 # required to stage whatever content we need, and the job itself will
2198 # force autotest to be staged if it isn't already.
2199 return _autoserv_command_line(self.host.hostname,
2200 self._extra_command_args)
2201
2202
2203 def prolog(self):
2204 super(ProvisionTask, self).prolog()
2205 # add check for previous provision task and abort if exist.
2206 logging.info("starting provision task for host: %s", self.host.hostname)
2207 self.queue_entry.set_status(
2208 models.HostQueueEntry.Status.PROVISIONING)
2209 self.host.set_status(models.Host.Status.PROVISIONING)
2210
2211
2212 def epilog(self):
Alex Millera4a78ef2013-09-03 21:23:05 -07002213 super(ProvisionTask, self).epilog()
Alex Millerdfff2fd2013-05-28 13:05:06 -07002214
Alex Millera4a78ef2013-09-03 21:23:05 -07002215 if self._should_pending():
Alex Millerdfff2fd2013-05-28 13:05:06 -07002216 self.queue_entry.on_pending()
2217 else:
2218 self.host.set_status(models.Host.Status.READY)
2219
2220
showarda9545c02009-12-18 22:44:26 +00002221class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
2222 """
2223 Common functionality for QueueTask and HostlessQueueTask
2224 """
2225 def __init__(self, queue_entries):
2226 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00002227 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00002228 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00002229
2230
showard73ec0442009-02-07 02:05:20 +00002231 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00002232 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00002233
2234
jamesrenc44ae992010-02-19 00:12:54 +00002235 def _write_control_file(self, execution_path):
2236 control_path = _drone_manager.attach_file_to_execution(
2237 execution_path, self.job.control_file)
2238 return control_path
2239
2240
Aviv Keshet308e7362013-05-21 14:43:16 -07002241 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00002242 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00002243 execution_path = self.queue_entries[0].execution_path()
2244 control_path = self._write_control_file(execution_path)
2245 hostnames = ','.join(entry.host.hostname
2246 for entry in self.queue_entries
2247 if not entry.is_hostless())
2248
2249 execution_tag = self.queue_entries[0].execution_tag()
2250 params = _autoserv_command_line(
2251 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07002252 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00002253 _drone_manager.absolute_path(control_path)],
2254 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07002255 if self.job.is_image_update_job():
2256 params += ['--image', self.job.update_image_path]
2257
jamesrenc44ae992010-02-19 00:12:54 +00002258 return params
showardd1195652009-12-08 22:21:02 +00002259
2260
2261 @property
2262 def num_processes(self):
2263 return len(self.queue_entries)
2264
2265
2266 @property
2267 def owner_username(self):
2268 return self.job.owner
2269
2270
2271 def _working_directory(self):
2272 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002273
2274
jadmanski0afbb632008-06-06 21:10:57 +00002275 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002276 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00002277 keyval_dict = self.job.keyval_dict()
2278 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00002279 group_name = self.queue_entries[0].get_group_name()
2280 if group_name:
2281 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002282 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002283 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002284 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002285 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002286
2287
showard35162b02009-03-03 02:17:30 +00002288 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002289 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002290 _drone_manager.write_lines_to_file(error_file_path,
2291 [_LOST_PROCESS_ERROR])
2292
2293
showardd3dc1992009-04-22 21:01:40 +00002294 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002295 if not self.monitor:
2296 return
2297
showardd9205182009-04-27 20:09:55 +00002298 self._write_job_finished()
2299
showard35162b02009-03-03 02:17:30 +00002300 if self.monitor.lost_process:
2301 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002302
jadmanskif7fa2cc2008-10-01 14:13:23 +00002303
showardcbd74612008-11-19 21:42:02 +00002304 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002305 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002306 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002307 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002308 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002309
2310
jadmanskif7fa2cc2008-10-01 14:13:23 +00002311 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002312 if not self.monitor or not self.monitor.has_process():
2313 return
2314
jadmanskif7fa2cc2008-10-01 14:13:23 +00002315 # build up sets of all the aborted_by and aborted_on values
2316 aborted_by, aborted_on = set(), set()
2317 for queue_entry in self.queue_entries:
2318 if queue_entry.aborted_by:
2319 aborted_by.add(queue_entry.aborted_by)
2320 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2321 aborted_on.add(t)
2322
2323 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00002324 # TODO(showard): this conditional is now obsolete, we just need to leave
2325 # it in temporarily for backwards compatibility over upgrades. delete
2326 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00002327 assert len(aborted_by) <= 1
2328 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002329 aborted_by_value = aborted_by.pop()
2330 aborted_on_value = max(aborted_on)
2331 else:
2332 aborted_by_value = 'autotest_system'
2333 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002334
showarda0382352009-02-11 23:36:43 +00002335 self._write_keyval_after_job("aborted_by", aborted_by_value)
2336 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002337
showardcbd74612008-11-19 21:42:02 +00002338 aborted_on_string = str(datetime.datetime.fromtimestamp(
2339 aborted_on_value))
2340 self._write_status_comment('Job aborted by %s on %s' %
2341 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002342
2343
jadmanski0afbb632008-06-06 21:10:57 +00002344 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002345 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002346 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002347 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002348
2349
jadmanski0afbb632008-06-06 21:10:57 +00002350 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002351 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002352 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002353
2354
2355class QueueTask(AbstractQueueTask):
2356 def __init__(self, queue_entries):
2357 super(QueueTask, self).__init__(queue_entries)
2358 self._set_ids(queue_entries=queue_entries)
2359
2360
2361 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002362 self._check_queue_entry_statuses(
2363 self.queue_entries,
2364 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2365 models.HostQueueEntry.Status.RUNNING),
2366 allowed_host_statuses=(models.Host.Status.PENDING,
2367 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002368
2369 super(QueueTask, self).prolog()
2370
2371 for queue_entry in self.queue_entries:
2372 self._write_host_keyvals(queue_entry.host)
2373 queue_entry.host.set_status(models.Host.Status.RUNNING)
2374 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00002375
2376
2377 def _finish_task(self):
2378 super(QueueTask, self)._finish_task()
2379
2380 for queue_entry in self.queue_entries:
2381 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00002382 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00002383
2384
Alex Miller9f01d5d2013-08-08 02:26:01 -07002385 def _command_line(self):
2386 invocation = super(QueueTask, self)._command_line()
2387 return invocation + ['--verify_job_repo_url']
2388
2389
Alex Millerc23a7f02013-08-27 17:36:42 -07002390class HostlessQueueTask(SelfThrottledTask, AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00002391 def __init__(self, queue_entry):
2392 super(HostlessQueueTask, self).__init__([queue_entry])
2393 self.queue_entry_ids = [queue_entry.id]
2394
2395
2396 def prolog(self):
2397 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2398 super(HostlessQueueTask, self).prolog()
2399
2400
mbligh4608b002010-01-05 18:22:35 +00002401 def _finish_task(self):
2402 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002403 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002404
2405
Alex Millerc23a7f02013-08-27 17:36:42 -07002406 @classmethod
2407 def _max_processes(cls):
2408 return scheduler_config.config.max_hostless_processes
2409
2410
showardd3dc1992009-04-22 21:01:40 +00002411class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002412 def __init__(self, queue_entries, log_file_name):
2413 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002414
showardd1195652009-12-08 22:21:02 +00002415 self.queue_entries = queue_entries
2416
showardd3dc1992009-04-22 21:01:40 +00002417 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002418 self._autoserv_monitor.attach_to_existing_process(
2419 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002420
showardd1195652009-12-08 22:21:02 +00002421
2422 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002423 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002424 return 'true'
2425 return self._generate_command(
2426 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002427
2428
2429 def _generate_command(self, results_dir):
2430 raise NotImplementedError('Subclasses must override this')
2431
2432
showardd1195652009-12-08 22:21:02 +00002433 @property
2434 def owner_username(self):
2435 return self.queue_entries[0].job.owner
2436
2437
2438 def _working_directory(self):
2439 return self._get_consistent_execution_path(self.queue_entries)
2440
2441
2442 def _paired_with_monitor(self):
2443 return self._autoserv_monitor
2444
2445
showardd3dc1992009-04-22 21:01:40 +00002446 def _job_was_aborted(self):
2447 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002448 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002449 queue_entry.update_from_database()
2450 if was_aborted is None: # first queue entry
2451 was_aborted = bool(queue_entry.aborted)
2452 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00002453 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
2454 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00002455 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00002456 'Inconsistent abort state',
2457 'Queue entries have inconsistent abort state:\n' +
2458 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00002459 # don't crash here, just assume true
2460 return True
2461 return was_aborted
2462
2463
showardd1195652009-12-08 22:21:02 +00002464 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002465 if self._job_was_aborted():
2466 return models.HostQueueEntry.Status.ABORTED
2467
2468 # we'll use a PidfileRunMonitor to read the autoserv exit status
2469 if self._autoserv_monitor.exit_code() == 0:
2470 return models.HostQueueEntry.Status.COMPLETED
2471 return models.HostQueueEntry.Status.FAILED
2472
2473
showardd3dc1992009-04-22 21:01:40 +00002474 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002475 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002476 queue_entry.set_status(status)
2477
2478
2479 def abort(self):
2480 # override AgentTask.abort() to avoid killing the process and ending
2481 # the task. post-job tasks continue when the job is aborted.
2482 pass
2483
2484
mbligh4608b002010-01-05 18:22:35 +00002485 def _pidfile_label(self):
2486 # '.autoserv_execute' -> 'autoserv'
2487 return self._pidfile_name()[1:-len('_execute')]
2488
2489
showard9bb960b2009-11-19 01:02:11 +00002490class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002491 """
2492 Task responsible for
2493 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2494 * copying logs to the results repository
2495 * spawning CleanupTasks for hosts, if necessary
2496 * spawning a FinalReparseTask for the job
2497 """
showardd1195652009-12-08 22:21:02 +00002498 def __init__(self, queue_entries, recover_run_monitor=None):
2499 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002500 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002501 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002502 self._set_ids(queue_entries=queue_entries)
2503
2504
Aviv Keshet308e7362013-05-21 14:43:16 -07002505 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd3dc1992009-04-22 21:01:40 +00002506 def _generate_command(self, results_dir):
2507 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002508 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002509 return [_autoserv_path , '-p',
2510 '--pidfile-label=%s' % self._pidfile_label(),
2511 '--use-existing-results', '--collect-crashinfo',
2512 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002513
2514
showardd1195652009-12-08 22:21:02 +00002515 @property
2516 def num_processes(self):
2517 return len(self.queue_entries)
2518
2519
2520 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002521 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002522
2523
showardd3dc1992009-04-22 21:01:40 +00002524 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002525 self._check_queue_entry_statuses(
2526 self.queue_entries,
2527 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2528 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002529
showardd3dc1992009-04-22 21:01:40 +00002530 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002531
2532
showardd3dc1992009-04-22 21:01:40 +00002533 def epilog(self):
2534 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002535 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002536 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002537
showard9bb960b2009-11-19 01:02:11 +00002538
2539 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002540 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002541 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002542 models.HostQueueEntry.Status.COMPLETED)
2543 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2544 else:
2545 final_success = False
2546 num_tests_failed = 0
showard9bb960b2009-11-19 01:02:11 +00002547 reboot_after = self._job.reboot_after
2548 do_reboot = (
2549 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002550 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002551 or reboot_after == model_attributes.RebootAfter.ALWAYS
2552 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
Dan Shi07e09af2013-04-12 09:31:29 -07002553 and final_success and num_tests_failed == 0)
2554 or num_tests_failed > 0)
showard9bb960b2009-11-19 01:02:11 +00002555
showardd1195652009-12-08 22:21:02 +00002556 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002557 if do_reboot:
2558 # don't pass the queue entry to the CleanupTask. if the cleanup
2559 # fails, the job doesn't care -- it's over.
2560 models.SpecialTask.objects.create(
2561 host=models.Host.objects.get(id=queue_entry.host.id),
2562 task=models.SpecialTask.Task.CLEANUP,
2563 requested_by=self._job.owner_model())
2564 else:
2565 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002566
2567
showard0bbfc212009-04-29 21:06:13 +00002568 def run(self):
showard597bfd32009-05-08 18:22:50 +00002569 autoserv_exit_code = self._autoserv_monitor.exit_code()
2570 # only run if Autoserv exited due to some signal. if we have no exit
2571 # code, assume something bad (and signal-like) happened.
2572 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002573 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002574 else:
2575 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002576
2577
Alex Millerc23a7f02013-08-27 17:36:42 -07002578class FinalReparseTask(SelfThrottledTask, PostJobTask):
showardd1195652009-12-08 22:21:02 +00002579 def __init__(self, queue_entries):
2580 super(FinalReparseTask, self).__init__(queue_entries,
2581 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002582 # don't use _set_ids, since we don't want to set the host_ids
2583 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002584
2585
2586 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002587 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002588 results_dir]
2589
2590
2591 @property
2592 def num_processes(self):
2593 return 0 # don't include parser processes in accounting
2594
2595
2596 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002597 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002598
2599
showard97aed502008-11-04 02:01:24 +00002600 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002601 def _max_processes(cls):
2602 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002603
2604
2605 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002606 self._check_queue_entry_statuses(
2607 self.queue_entries,
2608 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002609
showard97aed502008-11-04 02:01:24 +00002610 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002611
2612
2613 def epilog(self):
2614 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002615 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002616
2617
Alex Millerc23a7f02013-08-27 17:36:42 -07002618class ArchiveResultsTask(SelfThrottledTask, PostJobTask):
showarde1575b52010-01-15 00:21:12 +00002619 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2620
mbligh4608b002010-01-05 18:22:35 +00002621 def __init__(self, queue_entries):
2622 super(ArchiveResultsTask, self).__init__(queue_entries,
2623 log_file_name='.archiving.log')
2624 # don't use _set_ids, since we don't want to set the host_ids
2625 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002626
2627
mbligh4608b002010-01-05 18:22:35 +00002628 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002629 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002630
2631
Aviv Keshet308e7362013-05-21 14:43:16 -07002632 # TODO: Refactor into autoserv_utils. crbug.com/243090
mbligh4608b002010-01-05 18:22:35 +00002633 def _generate_command(self, results_dir):
2634 return [_autoserv_path , '-p',
2635 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002636 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002637 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2638 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002639
2640
mbligh4608b002010-01-05 18:22:35 +00002641 @classmethod
2642 def _max_processes(cls):
2643 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002644
2645
2646 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002647 self._check_queue_entry_statuses(
2648 self.queue_entries,
2649 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2650
2651 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002652
2653
mbligh4608b002010-01-05 18:22:35 +00002654 def epilog(self):
2655 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002656 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002657 failed_file = os.path.join(self._working_directory(),
2658 self._ARCHIVING_FAILED_FILE)
2659 paired_process = self._paired_with_monitor().get_process()
2660 _drone_manager.write_lines_to_file(
2661 failed_file, ['Archiving failed with exit code %s'
2662 % self.monitor.exit_code()],
2663 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002664 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002665
2666
mbligh36768f02008-02-22 18:28:33 +00002667if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002668 main()