blob: fa1e7b7a5efef9c3fb5b0115ac4330046033e0ae [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
mbligh36768f02008-02-22 18:28:33 +00008
Aviv Keshet225bdfe2013-03-05 10:10:08 -08009import datetime, optparse, os, signal
10import sys, time, traceback, urllib
11import logging, gc
showard402934a2009-12-21 22:20:47 +000012
Alex Miller05d7b4c2013-03-04 07:49:38 -080013import common
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000015
16import django.db
17
showard136e6dc2009-06-10 19:38:49 +000018from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000019from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000020from autotest_lib.database import database_connection
jamesrendd855242010-03-02 22:23:44 +000021from autotest_lib.frontend.afe import model_attributes
Alex Miller05d7b4c2013-03-04 07:49:38 -080022from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000023from autotest_lib.scheduler import drone_manager, drones, email_manager
Dale Curtisaa513362011-03-01 17:27:44 -080024from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
Alex Miller05d7b4c2013-03-04 07:49:38 -080025from autotest_lib.scheduler import scheduler_logging_config
jamesrenc44ae992010-02-19 00:12:54 +000026from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080027from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070028from autotest_lib.server import autoserv_utils
Alex Millerdfff2fd2013-05-28 13:05:06 -070029from autotest_lib.server.cros import provision
Fang Deng1d6c2a02013-04-17 15:25:45 -070030from autotest_lib.site_utils.graphite import stats
Alex Miller05d7b4c2013-03-04 07:49:38 -080031
showard549afad2009-08-20 23:33:36 +000032BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
33PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000034
mbligh36768f02008-02-22 18:28:33 +000035RESULTS_DIR = '.'
36AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000037DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000038AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
39
40if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000041 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000042AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
43AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
44
45if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000046 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000047
showard35162b02009-03-03 02:17:30 +000048# error message to leave in results dir when an autoserv process disappears
49# mysteriously
50_LOST_PROCESS_ERROR = """\
51Autoserv failed abnormally during execution for this job, probably due to a
52system error on the Autotest server. Full results may not be available. Sorry.
53"""
54
mbligh6f8bab42008-02-29 22:45:14 +000055_db = None
mbligh36768f02008-02-22 18:28:33 +000056_shutdown = False
Aviv Keshet308e7362013-05-21 14:43:16 -070057_autoserv_directory = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server')
58_autoserv_path = os.path.join(_autoserv_directory, 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000059_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000060_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000061
Eric Lie0493a42010-11-15 13:05:43 -080062def _parser_path_default(install_dir):
63 return os.path.join(install_dir, 'tko', 'parse')
64_parser_path_func = utils.import_site_function(
65 __file__, 'autotest_lib.scheduler.site_monitor_db',
66 'parser_path', _parser_path_default)
67_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
68
mbligh36768f02008-02-22 18:28:33 +000069
showardec6a3b92009-09-25 20:29:13 +000070def _get_pidfile_timeout_secs():
71 """@returns How long to wait for autoserv to write pidfile."""
72 pidfile_timeout_mins = global_config.global_config.get_config_value(
73 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
74 return pidfile_timeout_mins * 60
75
76
mbligh83c1e9e2009-05-01 23:10:41 +000077def _site_init_monitor_db_dummy():
78 return {}
79
80
jamesren76fcf192010-04-21 20:39:50 +000081def _verify_default_drone_set_exists():
82 if (models.DroneSet.drone_sets_enabled() and
83 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080084 raise host_scheduler.SchedulerError(
85 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000086
87
88def _sanity_check():
89 """Make sure the configs are consistent before starting the scheduler"""
90 _verify_default_drone_set_exists()
91
92
mbligh36768f02008-02-22 18:28:33 +000093def main():
showard27f33872009-04-07 18:20:53 +000094 try:
showard549afad2009-08-20 23:33:36 +000095 try:
96 main_without_exception_handling()
97 except SystemExit:
98 raise
99 except:
100 logging.exception('Exception escaping in monitor_db')
101 raise
102 finally:
103 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000104
105
106def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000107 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000108
showard136e6dc2009-06-10 19:38:49 +0000109 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000110 parser = optparse.OptionParser(usage)
111 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
112 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000113 parser.add_option('--test', help='Indicate that scheduler is under ' +
114 'test and should use dummy autoserv and no parsing',
115 action='store_true')
116 (options, args) = parser.parse_args()
117 if len(args) != 1:
118 parser.print_usage()
119 return
mbligh36768f02008-02-22 18:28:33 +0000120
showard5613c662009-06-08 23:30:33 +0000121 scheduler_enabled = global_config.global_config.get_config_value(
122 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
123
124 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800125 logging.error("Scheduler not enabled, set enable_scheduler to true in "
126 "the global_config's SCHEDULER section to enable it. "
127 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000128 sys.exit(1)
129
jadmanski0afbb632008-06-06 21:10:57 +0000130 global RESULTS_DIR
131 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000132
mbligh83c1e9e2009-05-01 23:10:41 +0000133 site_init = utils.import_site_function(__file__,
134 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
135 _site_init_monitor_db_dummy)
136 site_init()
137
showardcca334f2009-03-12 20:38:34 +0000138 # Change the cwd while running to avoid issues incase we were launched from
139 # somewhere odd (such as a random NFS home directory of the person running
140 # sudo to launch us as the appropriate user).
141 os.chdir(RESULTS_DIR)
142
jamesrenc7d387e2010-08-10 21:48:30 +0000143 # This is helpful for debugging why stuff a scheduler launches is
144 # misbehaving.
145 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000146
jadmanski0afbb632008-06-06 21:10:57 +0000147 if options.test:
148 global _autoserv_path
149 _autoserv_path = 'autoserv_dummy'
150 global _testing_mode
151 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000152
jamesrenc44ae992010-02-19 00:12:54 +0000153 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000154 server.start()
155
jadmanski0afbb632008-06-06 21:10:57 +0000156 try:
jamesrenc44ae992010-02-19 00:12:54 +0000157 initialize()
showardc5afc462009-01-13 00:09:39 +0000158 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000159 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000160
Eric Lia82dc352011-02-23 13:15:52 -0800161 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000162 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000163 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000164 except:
showard170873e2009-01-07 00:22:26 +0000165 email_manager.manager.log_stacktrace(
166 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000167
showard170873e2009-01-07 00:22:26 +0000168 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000169 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000170 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000171 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000172
173
showard136e6dc2009-06-10 19:38:49 +0000174def setup_logging():
175 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
176 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
177 logging_manager.configure_logging(
178 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
179 logfile_name=log_name)
180
181
mbligh36768f02008-02-22 18:28:33 +0000182def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000183 global _shutdown
184 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000185 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000186
187
jamesrenc44ae992010-02-19 00:12:54 +0000188def initialize():
showardb18134f2009-03-20 20:52:18 +0000189 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
190 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000191
showard8de37132009-08-31 18:33:08 +0000192 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000193 logging.critical("monitor_db already running, aborting!")
194 sys.exit(1)
195 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000196
showardb1e51872008-10-07 11:08:18 +0000197 if _testing_mode:
198 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000199 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000200
jadmanski0afbb632008-06-06 21:10:57 +0000201 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
202 global _db
showard170873e2009-01-07 00:22:26 +0000203 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000204 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000205
showardfa8629c2008-11-04 16:51:23 +0000206 # ensure Django connection is in autocommit
207 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000208 # bypass the readonly connection
209 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000210
showardb18134f2009-03-20 20:52:18 +0000211 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000212 signal.signal(signal.SIGINT, handle_sigint)
213
jamesrenc44ae992010-02-19 00:12:54 +0000214 initialize_globals()
215 scheduler_models.initialize()
216
showardd1ee1dd2009-01-07 21:33:08 +0000217 drones = global_config.global_config.get_config_value(
218 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
219 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000220 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000221 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000222 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
223
showardb18134f2009-03-20 20:52:18 +0000224 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000225
226
jamesrenc44ae992010-02-19 00:12:54 +0000227def initialize_globals():
228 global _drone_manager
229 _drone_manager = drone_manager.instance()
230
231
showarded2afea2009-07-07 20:54:07 +0000232def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
233 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000234 """
235 @returns The autoserv command line as a list of executable + parameters.
236
237 @param machines - string - A machine or comma separated list of machines
238 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000239 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700240 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
241 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000242 @param queue_entry - A HostQueueEntry object - If supplied and no Job
243 object was supplied, this will be used to lookup the Job object.
244 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700245 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
246 machines, results_directory=drone_manager.WORKING_DIRECTORY,
247 extra_args=extra_args, job=job, queue_entry=queue_entry,
248 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000249
250
Simran Basia858a232012-08-21 11:04:37 -0700251class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800252
253
jadmanski0afbb632008-06-06 21:10:57 +0000254 def __init__(self):
255 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000256 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800257 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000258 user_cleanup_time = scheduler_config.config.clean_interval
259 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
260 _db, user_cleanup_time)
261 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000262 self._host_agents = {}
263 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000264 self._tick_count = 0
265 self._last_garbage_stats_time = time.time()
266 self._seconds_between_garbage_stats = 60 * (
267 global_config.global_config.get_config_value(
268 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700269 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700270 self._tick_debug = global_config.global_config.get_config_value(
271 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
272 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700273 self._extra_debugging = global_config.global_config.get_config_value(
274 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
275 default=False)
mbligh36768f02008-02-22 18:28:33 +0000276
mbligh36768f02008-02-22 18:28:33 +0000277
showard915958d2009-04-22 21:00:58 +0000278 def initialize(self, recover_hosts=True):
279 self._periodic_cleanup.initialize()
280 self._24hr_upkeep.initialize()
281
jadmanski0afbb632008-06-06 21:10:57 +0000282 # always recover processes
283 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000284
jadmanski0afbb632008-06-06 21:10:57 +0000285 if recover_hosts:
286 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000287
jamesrenc44ae992010-02-19 00:12:54 +0000288 self._host_scheduler.recovery_on_startup()
289
mbligh36768f02008-02-22 18:28:33 +0000290
Simran Basi0ec94dd2012-08-28 09:50:10 -0700291 def _log_tick_msg(self, msg):
292 if self._tick_debug:
293 logging.debug(msg)
294
295
Simran Basidef92872012-09-20 13:34:34 -0700296 def _log_extra_msg(self, msg):
297 if self._extra_debugging:
298 logging.debug(msg)
299
300
jadmanski0afbb632008-06-06 21:10:57 +0000301 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700302 """
303 This is an altered version of tick() where we keep track of when each
304 major step begins so we can try to figure out where we are using most
305 of the tick time.
306 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700307 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700308 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000309 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700310 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000311 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700312 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000313 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700314 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000315 self._find_aborting()
beeps8bb1f7d2013-08-05 01:30:09 -0700316 self._log_tick_msg('Calling _find_aborted_special_tasks().')
317 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700318 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000319 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700320 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000321 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700322 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000323 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700324 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000325 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700326 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000327 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700328 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000329 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700330 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000331 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700332 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000333 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700334 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700335 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700336 with timer.get_client('email_manager_send_queued_emails'):
337 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700338 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700339 with timer.get_client('django_db_reset_queries'):
340 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000341 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000342
showard97aed502008-11-04 02:01:24 +0000343
mblighf3294cc2009-04-08 21:17:38 +0000344 def _run_cleanup(self):
345 self._periodic_cleanup.run_cleanup_maybe()
346 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000347
mbligh36768f02008-02-22 18:28:33 +0000348
showardf13a9e22009-12-18 22:54:09 +0000349 def _garbage_collection(self):
350 threshold_time = time.time() - self._seconds_between_garbage_stats
351 if threshold_time < self._last_garbage_stats_time:
352 # Don't generate these reports very often.
353 return
354
355 self._last_garbage_stats_time = time.time()
356 # Force a full level 0 collection (because we can, it doesn't hurt
357 # at this interval).
358 gc.collect()
359 logging.info('Logging garbage collector stats on tick %d.',
360 self._tick_count)
361 gc_stats._log_garbage_collector_stats()
362
363
showard170873e2009-01-07 00:22:26 +0000364 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
365 for object_id in object_ids:
366 agent_dict.setdefault(object_id, set()).add(agent)
367
368
369 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
370 for object_id in object_ids:
371 assert object_id in agent_dict
372 agent_dict[object_id].remove(agent)
373
374
showardd1195652009-12-08 22:21:02 +0000375 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700376 """
377 Creates and adds an agent to the dispatchers list.
378
379 In creating the agent we also pass on all the queue_entry_ids and
380 host_ids from the special agent task. For every agent we create, we
381 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
382 against the host_ids given to it. So theoritically, a host can have any
383 number of agents associated with it, and each of them can have any
384 special agent task, though in practice we never see > 1 agent/task per
385 host at any time.
386
387 @param agent_task: A SpecialTask for the agent to manage.
388 """
showardd1195652009-12-08 22:21:02 +0000389 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000390 self._agents.append(agent)
391 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000392 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
393 self._register_agent_for_ids(self._queue_entry_agents,
394 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000395
showard170873e2009-01-07 00:22:26 +0000396
397 def get_agents_for_entry(self, queue_entry):
398 """
399 Find agents corresponding to the specified queue_entry.
400 """
showardd3dc1992009-04-22 21:01:40 +0000401 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000402
403
404 def host_has_agent(self, host):
405 """
406 Determine if there is currently an Agent present using this host.
407 """
408 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000409
410
jadmanski0afbb632008-06-06 21:10:57 +0000411 def remove_agent(self, agent):
412 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000413 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
414 agent)
415 self._unregister_agent_for_ids(self._queue_entry_agents,
416 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000417
418
showard8cc058f2009-09-08 16:26:33 +0000419 def _host_has_scheduled_special_task(self, host):
420 return bool(models.SpecialTask.objects.filter(host__id=host.id,
421 is_active=False,
422 is_complete=False))
423
424
jadmanski0afbb632008-06-06 21:10:57 +0000425 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000426 agent_tasks = self._create_recovery_agent_tasks()
427 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000428 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000429 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000430 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000431 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000432 self._reverify_remaining_hosts()
433 # reinitialize drones after killing orphaned processes, since they can
434 # leave around files when they die
435 _drone_manager.execute_actions()
436 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000437
showard170873e2009-01-07 00:22:26 +0000438
showardd1195652009-12-08 22:21:02 +0000439 def _create_recovery_agent_tasks(self):
440 return (self._get_queue_entry_agent_tasks()
441 + self._get_special_task_agent_tasks(is_active=True))
442
443
444 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700445 """
446 Get agent tasks for all hqe in the specified states.
447
448 Loosely this translates to taking a hqe in one of the specified states,
449 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
450 through _get_agent_task_for_queue_entry. Each queue entry can only have
451 one agent task at a time, but there might be multiple queue entries in
452 the group.
453
454 @return: A list of AgentTasks.
455 """
showardd1195652009-12-08 22:21:02 +0000456 # host queue entry statuses handled directly by AgentTasks (Verifying is
457 # handled through SpecialTasks, so is not listed here)
458 statuses = (models.HostQueueEntry.Status.STARTING,
459 models.HostQueueEntry.Status.RUNNING,
460 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000461 models.HostQueueEntry.Status.PARSING,
462 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000463 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000464 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000465 where='status IN (%s)' % status_list)
466
467 agent_tasks = []
468 used_queue_entries = set()
469 for entry in queue_entries:
470 if self.get_agents_for_entry(entry):
471 # already being handled
472 continue
473 if entry in used_queue_entries:
474 # already picked up by a synchronous job
475 continue
476 agent_task = self._get_agent_task_for_queue_entry(entry)
477 agent_tasks.append(agent_task)
478 used_queue_entries.update(agent_task.queue_entries)
479 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000480
481
showardd1195652009-12-08 22:21:02 +0000482 def _get_special_task_agent_tasks(self, is_active=False):
483 special_tasks = models.SpecialTask.objects.filter(
484 is_active=is_active, is_complete=False)
485 return [self._get_agent_task_for_special_task(task)
486 for task in special_tasks]
487
488
489 def _get_agent_task_for_queue_entry(self, queue_entry):
490 """
beeps8bb1f7d2013-08-05 01:30:09 -0700491 Construct an AgentTask instance for the given active HostQueueEntry.
492
showardd1195652009-12-08 22:21:02 +0000493 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700494 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000495 """
496 task_entries = queue_entry.job.get_group_entries(queue_entry)
497 self._check_for_duplicate_host_entries(task_entries)
498
499 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
500 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000501 if queue_entry.is_hostless():
502 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000503 return QueueTask(queue_entries=task_entries)
504 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
505 return GatherLogsTask(queue_entries=task_entries)
506 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
507 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000508 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
509 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000510
Dale Curtisaa513362011-03-01 17:27:44 -0800511 raise host_scheduler.SchedulerError(
512 '_get_agent_task_for_queue_entry got entry with '
513 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000514
515
516 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000517 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
518 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000519 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000520 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000521 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000522 if using_host:
showardd1195652009-12-08 22:21:02 +0000523 self._assert_host_has_no_agent(task_entry)
524
525
526 def _assert_host_has_no_agent(self, entry):
527 """
528 @param entry: a HostQueueEntry or a SpecialTask
529 """
530 if self.host_has_agent(entry.host):
531 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800532 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000533 'While scheduling %s, host %s already has a host agent %s'
534 % (entry, entry.host, agent.task))
535
536
537 def _get_agent_task_for_special_task(self, special_task):
538 """
539 Construct an AgentTask class to run the given SpecialTask and add it
540 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700541
542 A special task is create through schedule_special_tasks, but only if
543 the host doesn't already have an agent. This happens through
544 add_agent_task. All special agent tasks are given a host on creation,
545 and a Null hqe. To create a SpecialAgentTask object, you need a
546 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
547 object contains a hqe it's passed on to the special agent task, which
548 creates a HostQueueEntry and saves it as it's queue_entry.
549
showardd1195652009-12-08 22:21:02 +0000550 @param special_task: a models.SpecialTask instance
551 @returns an AgentTask to run this SpecialTask
552 """
553 self._assert_host_has_no_agent(special_task)
554
Dan Shi07e09af2013-04-12 09:31:29 -0700555 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask,
Alex Millerdfff2fd2013-05-28 13:05:06 -0700556 ResetTask, ProvisionTask)
showardd1195652009-12-08 22:21:02 +0000557 for agent_task_class in special_agent_task_classes:
558 if agent_task_class.TASK_TYPE == special_task.task:
559 return agent_task_class(task=special_task)
560
Dale Curtisaa513362011-03-01 17:27:44 -0800561 raise host_scheduler.SchedulerError(
562 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000563
564
565 def _register_pidfiles(self, agent_tasks):
566 for agent_task in agent_tasks:
567 agent_task.register_necessary_pidfiles()
568
569
570 def _recover_tasks(self, agent_tasks):
571 orphans = _drone_manager.get_orphaned_autoserv_processes()
572
573 for agent_task in agent_tasks:
574 agent_task.recover()
575 if agent_task.monitor and agent_task.monitor.has_process():
576 orphans.discard(agent_task.monitor.get_process())
577 self.add_agent_task(agent_task)
578
579 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000580
581
showard8cc058f2009-09-08 16:26:33 +0000582 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000583 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
584 % status):
showard0db3d432009-10-12 20:29:15 +0000585 if entry.status == status and not self.get_agents_for_entry(entry):
586 # The status can change during iteration, e.g., if job.run()
587 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000588 yield entry
589
590
showard6878e8b2009-07-20 22:37:45 +0000591 def _check_for_remaining_orphan_processes(self, orphans):
592 if not orphans:
593 return
594 subject = 'Unrecovered orphan autoserv processes remain'
595 message = '\n'.join(str(process) for process in orphans)
596 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000597
598 die_on_orphans = global_config.global_config.get_config_value(
599 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
600
601 if die_on_orphans:
602 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000603
showard170873e2009-01-07 00:22:26 +0000604
showard8cc058f2009-09-08 16:26:33 +0000605 def _recover_pending_entries(self):
606 for entry in self._get_unassigned_entries(
607 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000608 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000609 entry.on_pending()
610
611
showardb8900452009-10-12 20:31:01 +0000612 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000613 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000614 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
615 unrecovered_hqes = []
616 for queue_entry in queue_entries:
617 special_tasks = models.SpecialTask.objects.filter(
618 task__in=(models.SpecialTask.Task.CLEANUP,
619 models.SpecialTask.Task.VERIFY),
620 queue_entry__id=queue_entry.id,
621 is_complete=False)
622 if special_tasks.count() == 0:
623 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000624
showardb8900452009-10-12 20:31:01 +0000625 if unrecovered_hqes:
626 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800627 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000628 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000629 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000630
631
showard65db3932009-10-28 19:54:35 +0000632 def _get_prioritized_special_tasks(self):
633 """
634 Returns all queued SpecialTasks prioritized for repair first, then
635 cleanup, then verify.
beeps8bb1f7d2013-08-05 01:30:09 -0700636
637 @return: list of afe.models.SpecialTasks sorted according to priority.
showard65db3932009-10-28 19:54:35 +0000638 """
639 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
640 is_complete=False,
641 host__locked=False)
642 # exclude hosts with active queue entries unless the SpecialTask is for
643 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000644 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000645 queued_tasks, 'afe_host_queue_entries', 'host_id',
646 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000647 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000648 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000649 where=['(afe_host_queue_entries.id IS NULL OR '
650 'afe_host_queue_entries.id = '
651 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000652
showard65db3932009-10-28 19:54:35 +0000653 # reorder tasks by priority
654 task_priority_order = [models.SpecialTask.Task.REPAIR,
655 models.SpecialTask.Task.CLEANUP,
Dan Shi07e09af2013-04-12 09:31:29 -0700656 models.SpecialTask.Task.VERIFY,
Alex Millerdfff2fd2013-05-28 13:05:06 -0700657 models.SpecialTask.Task.RESET,
658 models.SpecialTask.Task.PROVISION]
showard65db3932009-10-28 19:54:35 +0000659 def task_priority_key(task):
660 return task_priority_order.index(task.task)
661 return sorted(queued_tasks, key=task_priority_key)
662
663
showard65db3932009-10-28 19:54:35 +0000664 def _schedule_special_tasks(self):
665 """
666 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700667
668 Special tasks include PreJobTasks like verify, reset and cleanup.
669 They are created through _schedule_new_jobs and associated with a hqe
670 This method translates SpecialTasks to the appropriate AgentTask and
671 adds them to the dispatchers agents list, so _handle_agents can execute
672 them.
showard65db3932009-10-28 19:54:35 +0000673 """
674 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000675 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000676 continue
showardd1195652009-12-08 22:21:02 +0000677 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000678
679
showard170873e2009-01-07 00:22:26 +0000680 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000681 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000682 # should never happen
showarded2afea2009-07-07 20:54:07 +0000683 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000684 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000685 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700686 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000687 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000688
689
jadmanski0afbb632008-06-06 21:10:57 +0000690 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000691 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700692 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000693 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000694 if self.host_has_agent(host):
695 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000696 continue
showard8cc058f2009-09-08 16:26:33 +0000697 if self._host_has_scheduled_special_task(host):
698 # host will have a special task scheduled on the next cycle
699 continue
showard170873e2009-01-07 00:22:26 +0000700 if print_message:
showardb18134f2009-03-20 20:52:18 +0000701 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000702 models.SpecialTask.objects.create(
703 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000704 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000705
706
jadmanski0afbb632008-06-06 21:10:57 +0000707 def _recover_hosts(self):
708 # recover "Repair Failed" hosts
709 message = 'Reverifying dead host %s'
710 self._reverify_hosts_where("status = 'Repair Failed'",
711 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000712
713
showard04c82c52008-05-29 19:38:12 +0000714
showardb95b1bd2008-08-15 18:11:04 +0000715 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000716 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +0000717 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000718 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000719 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +0000720 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000721
722
showard89f84db2009-03-12 20:39:13 +0000723 def _refresh_pending_queue_entries(self):
724 """
725 Lookup the pending HostQueueEntries and call our HostScheduler
726 refresh() method given that list. Return the list.
727
728 @returns A list of pending HostQueueEntries sorted in priority order.
729 """
showard63a34772008-08-18 19:32:50 +0000730 queue_entries = self._get_pending_queue_entries()
731 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000732 return []
showardb95b1bd2008-08-15 18:11:04 +0000733
showard63a34772008-08-18 19:32:50 +0000734 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000735
showard89f84db2009-03-12 20:39:13 +0000736 return queue_entries
737
738
739 def _schedule_atomic_group(self, queue_entry):
740 """
741 Schedule the given queue_entry on an atomic group of hosts.
742
743 Returns immediately if there are insufficient available hosts.
744
745 Creates new HostQueueEntries based off of queue_entry for the
746 scheduled hosts and starts them all running.
747 """
748 # This is a virtual host queue entry representing an entire
749 # atomic group, find a group and schedule their hosts.
750 group_hosts = self._host_scheduler.find_eligible_atomic_group(
751 queue_entry)
752 if not group_hosts:
753 return
showardcbe6f942009-06-17 19:33:49 +0000754
755 logging.info('Expanding atomic group entry %s with hosts %s',
756 queue_entry,
757 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000758
showard89f84db2009-03-12 20:39:13 +0000759 for assigned_host in group_hosts[1:]:
760 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000761 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000762 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000763 new_hqe.set_host(assigned_host)
764 self._run_queue_entry(new_hqe)
765
766 # The first assigned host uses the original HostQueueEntry
767 queue_entry.set_host(group_hosts[0])
768 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000769
770
showarda9545c02009-12-18 22:44:26 +0000771 def _schedule_hostless_job(self, queue_entry):
772 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000773 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000774
775
showard89f84db2009-03-12 20:39:13 +0000776 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700777 """
778 Find any new HQEs and call schedule_pre_job_tasks for it.
779
780 This involves setting the status of the HQE and creating a row in the
781 db corresponding the the special task, through
782 scheduler_models._queue_special_task. The new db row is then added as
783 an agent to the dispatcher through _schedule_special_tasks and
784 scheduled for execution on the drone through _handle_agents.
785 """
showard89f84db2009-03-12 20:39:13 +0000786 queue_entries = self._refresh_pending_queue_entries()
787 if not queue_entries:
788 return
789
beepsb255fc52013-10-13 23:28:54 -0700790 new_hostless_jobs = 0
791 new_atomic_groups = 0
792 new_jobs_with_hosts = 0
793 new_jobs_need_hosts = 0
794
Simran Basi3f6717d2012-09-13 15:21:22 -0700795 logging.debug('Processing %d queue_entries', len(queue_entries))
showard63a34772008-08-18 19:32:50 +0000796 for queue_entry in queue_entries:
Simran Basidef92872012-09-20 13:34:34 -0700797 self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
showarde55955f2009-10-07 20:48:58 +0000798 is_unassigned_atomic_group = (
799 queue_entry.atomic_group_id is not None
800 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +0000801
802 if queue_entry.is_hostless():
Simran Basidef92872012-09-20 13:34:34 -0700803 self._log_extra_msg('Scheduling hostless job.')
showarda9545c02009-12-18 22:44:26 +0000804 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700805 new_hostless_jobs = new_hostless_jobs + 1
jamesren883492a2010-02-12 00:45:18 +0000806 elif is_unassigned_atomic_group:
807 self._schedule_atomic_group(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700808 new_atmoic_groups = new_atomic_groups + 1
showarde55955f2009-10-07 20:48:58 +0000809 else:
beepsb255fc52013-10-13 23:28:54 -0700810 new_jobs_need_hosts = new_jobs_need_hosts + 1
jamesren883492a2010-02-12 00:45:18 +0000811 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +0000812 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +0000813 assert assigned_host.id == queue_entry.host_id
814 self._run_queue_entry(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700815 new_jobs_with_hosts = new_jobs_with_hosts + 1
816
817 key = 'scheduler.jobs_per_tick'
818 stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
819 stats.Gauge(key).send('new_atomic_groups', new_atomic_groups)
820 stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
821 stats.Gauge(key).send('new_jobs_without_hosts',
822 new_jobs_need_hosts - new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000823
824
showard8cc058f2009-09-08 16:26:33 +0000825 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700826 """
827 Adds agents to the dispatcher.
828
829 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
830 QueueTask for example, will have a job with a control file, and
831 the agent will have methods that poll, abort and check if the queue
832 task is finished. The dispatcher runs the agent_task, as well as
833 other agents in it's _agents member, through _handle_agents, by
834 calling the Agents tick().
835
836 This method creates an agent for each HQE in one of (starting, running,
837 gathering, parsing, archiving) states, and adds it to the dispatcher so
838 it is handled by _handle_agents.
839 """
showardd1195652009-12-08 22:21:02 +0000840 for agent_task in self._get_queue_entry_agent_tasks():
841 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000842
843
844 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000845 for entry in scheduler_models.HostQueueEntry.fetch(
846 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000847 task = entry.job.schedule_delayed_callback_task(entry)
848 if task:
showardd1195652009-12-08 22:21:02 +0000849 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000850
851
jamesren883492a2010-02-12 00:45:18 +0000852 def _run_queue_entry(self, queue_entry):
Simran Basidef92872012-09-20 13:34:34 -0700853 self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
854 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000855 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000856
857
jadmanski0afbb632008-06-06 21:10:57 +0000858 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700859 """
860 Looks through the afe_host_queue_entries for an aborted entry.
861
862 The aborted bit is set on an HQE in many ways, the most common
863 being when a user requests an abort through the frontend, which
864 results in an rpc from the afe to abort_host_queue_entries.
865 """
jamesrene7c65cb2010-06-08 20:38:10 +0000866 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000867 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700868 where='aborted=1 and complete=0'):
showardf4a2e502009-07-28 20:06:39 +0000869 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +0000870 for agent in self.get_agents_for_entry(entry):
871 agent.abort()
872 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000873 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700874 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000875 for job in jobs_to_stop:
876 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000877
878
beeps8bb1f7d2013-08-05 01:30:09 -0700879 def _find_aborted_special_tasks(self):
880 """
881 Find SpecialTasks that have been marked for abortion.
882
883 Poll the database looking for SpecialTasks that are active
884 and have been marked for abortion, then abort them.
885 """
886
887 # The completed and active bits are very important when it comes
888 # to scheduler correctness. The active bit is set through the prolog
889 # of a special task, and reset through the cleanup method of the
890 # SpecialAgentTask. The cleanup is called both through the abort and
891 # epilog. The complete bit is set in several places, and in general
892 # a hanging job will have is_active=1 is_complete=0, while a special
893 # task which completed will have is_active=0 is_complete=1. To check
894 # aborts we directly check active because the complete bit is set in
895 # several places, including the epilog of agent tasks.
896 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
897 is_aborted=True)
898 for task in aborted_tasks:
899 # There are 2 ways to get the agent associated with a task,
900 # through the host and through the hqe. A special task
901 # always needs a host, but doesn't always need a hqe.
902 for agent in self._host_agents.get(task.host.id, []):
903 if isinstance(agent.task, SpecialAgentTask):
904
905 # The epilog preforms critical actions such as
906 # queueing the next SpecialTask, requeuing the
907 # hqe etc, however it doesn't actually kill the
908 # monitor process and set the 'done' bit. Epilogs
909 # assume that the job failed, and that the monitor
910 # process has already written an exit code. The
911 # done bit is a necessary condition for
912 # _handle_agents to schedule any more special
913 # tasks against the host, and it must be set
914 # in addition to is_active, is_complete and success.
915 agent.task.epilog()
916 agent.task.abort()
917
918
showard324bf812009-01-20 23:23:38 +0000919 def _can_start_agent(self, agent, num_started_this_cycle,
920 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000921 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000922 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000923 return True
924 # don't allow any nonzero-process agents to run after we've reached a
925 # limit (this avoids starvation of many-process agents)
926 if have_reached_limit:
927 return False
928 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000929 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000930 agent.task.owner_username,
931 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000932 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000933 return False
934 # if a single agent exceeds the per-cycle throttling, still allow it to
935 # run when it's the first agent in the cycle
936 if num_started_this_cycle == 0:
937 return True
938 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000939 if (num_started_this_cycle + agent.task.num_processes >
940 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000941 return False
942 return True
943
944
jadmanski0afbb632008-06-06 21:10:57 +0000945 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700946 """
947 Handles agents of the dispatcher.
948
949 Appropriate Agents are added to the dispatcher through
950 _schedule_running_host_queue_entries. These agents each
951 have a task. This method runs the agents task through
952 agent.tick() leading to:
953 agent.start
954 prolog -> AgentTasks prolog
955 For each queue entry:
956 sets host status/status to Running
957 set started_on in afe_host_queue_entries
958 run -> AgentTasks run
959 Creates PidfileRunMonitor
960 Queues the autoserv command line for this AgentTask
961 via the drone manager. These commands are executed
962 through the drone managers execute actions.
963 poll -> AgentTasks/BaseAgentTask poll
964 checks the monitors exit_code.
965 Executes epilog if task is finished.
966 Executes AgentTasks _finish_task
967 finish_task is usually responsible for setting the status
968 of the HQE/host, and updating it's active and complete fileds.
969
970 agent.is_done
971 Removed the agent from the dispatchers _agents queue.
972 Is_done checks the finished bit on the agent, that is
973 set based on the Agents task. During the agents poll
974 we check to see if the monitor process has exited in
975 it's finish method, and set the success member of the
976 task based on this exit code.
977 """
jadmanski0afbb632008-06-06 21:10:57 +0000978 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000979 have_reached_limit = False
980 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700981 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000982 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700983 self._log_extra_msg('Processing Agent with Host Ids: %s and '
984 'queue_entry ids:%s' % (agent.host_ids,
985 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000986 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000987 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000988 have_reached_limit):
989 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700990 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000991 continue
showardd1195652009-12-08 22:21:02 +0000992 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700993 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000994 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700995 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000996 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700997 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000998 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700999 logging.info('%d running processes. %d added this cycle.',
1000 _drone_manager.total_running_processes(),
1001 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +00001002
1003
showard29f7cd22009-04-29 21:16:24 +00001004 def _process_recurring_runs(self):
1005 recurring_runs = models.RecurringRun.objects.filter(
1006 start_date__lte=datetime.datetime.now())
1007 for rrun in recurring_runs:
1008 # Create job from template
1009 job = rrun.job
1010 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001011 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001012
1013 host_objects = info['hosts']
1014 one_time_hosts = info['one_time_hosts']
1015 metahost_objects = info['meta_hosts']
1016 dependencies = info['dependencies']
1017 atomic_group = info['atomic_group']
1018
1019 for host in one_time_hosts or []:
1020 this_host = models.Host.create_one_time_host(host.hostname)
1021 host_objects.append(this_host)
1022
1023 try:
1024 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001025 options=options,
showard29f7cd22009-04-29 21:16:24 +00001026 host_objects=host_objects,
1027 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001028 atomic_group=atomic_group)
1029
1030 except Exception, ex:
1031 logging.exception(ex)
1032 #TODO send email
1033
1034 if rrun.loop_count == 1:
1035 rrun.delete()
1036 else:
1037 if rrun.loop_count != 0: # if not infinite loop
1038 # calculate new start_date
1039 difference = datetime.timedelta(seconds=rrun.loop_period)
1040 rrun.start_date = rrun.start_date + difference
1041 rrun.loop_count -= 1
1042 rrun.save()
1043
1044
Simran Basia858a232012-08-21 11:04:37 -07001045SiteDispatcher = utils.import_site_class(
1046 __file__, 'autotest_lib.scheduler.site_monitor_db',
1047 'SiteDispatcher', BaseDispatcher)
1048
1049class Dispatcher(SiteDispatcher):
1050 pass
1051
1052
showard170873e2009-01-07 00:22:26 +00001053class PidfileRunMonitor(object):
1054 """
1055 Client must call either run() to start a new process or
1056 attach_to_existing_process().
1057 """
mbligh36768f02008-02-22 18:28:33 +00001058
showard170873e2009-01-07 00:22:26 +00001059 class _PidfileException(Exception):
1060 """
1061 Raised when there's some unexpected behavior with the pid file, but only
1062 used internally (never allowed to escape this class).
1063 """
mbligh36768f02008-02-22 18:28:33 +00001064
1065
showard170873e2009-01-07 00:22:26 +00001066 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001067 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001068 self._start_time = None
1069 self.pidfile_id = None
1070 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001071
1072
showard170873e2009-01-07 00:22:26 +00001073 def _add_nice_command(self, command, nice_level):
1074 if not nice_level:
1075 return command
1076 return ['nice', '-n', str(nice_level)] + command
1077
1078
1079 def _set_start_time(self):
1080 self._start_time = time.time()
1081
1082
showard418785b2009-11-23 20:19:59 +00001083 def run(self, command, working_directory, num_processes, nice_level=None,
1084 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +00001085 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +00001086 assert command is not None
1087 if nice_level is not None:
1088 command = ['nice', '-n', str(nice_level)] + command
1089 self._set_start_time()
1090 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001091 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001092 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +00001093 paired_with_pidfile=paired_with_pidfile, username=username,
1094 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +00001095
1096
showarded2afea2009-07-07 20:54:07 +00001097 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +00001098 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +00001099 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001100 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001101 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001102 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001103 if num_processes is not None:
1104 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001105
1106
jadmanski0afbb632008-06-06 21:10:57 +00001107 def kill(self):
showard170873e2009-01-07 00:22:26 +00001108 if self.has_process():
1109 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001110
mbligh36768f02008-02-22 18:28:33 +00001111
showard170873e2009-01-07 00:22:26 +00001112 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001113 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001114 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001115
1116
showard170873e2009-01-07 00:22:26 +00001117 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001118 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001119 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001120 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001121
1122
showard170873e2009-01-07 00:22:26 +00001123 def _read_pidfile(self, use_second_read=False):
1124 assert self.pidfile_id is not None, (
1125 'You must call run() or attach_to_existing_process()')
1126 contents = _drone_manager.get_pidfile_contents(
1127 self.pidfile_id, use_second_read=use_second_read)
1128 if contents.is_invalid():
1129 self._state = drone_manager.PidfileContents()
1130 raise self._PidfileException(contents)
1131 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001132
1133
showard21baa452008-10-21 00:08:39 +00001134 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001135 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1136 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001137 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001138 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001139
1140
1141 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001142 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001143 return
mblighbb421852008-03-11 22:36:16 +00001144
showard21baa452008-10-21 00:08:39 +00001145 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001146
showard170873e2009-01-07 00:22:26 +00001147 if self._state.process is None:
1148 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001149 return
mbligh90a549d2008-03-25 23:52:34 +00001150
showard21baa452008-10-21 00:08:39 +00001151 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001152 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001153 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001154 return
mbligh90a549d2008-03-25 23:52:34 +00001155
showard170873e2009-01-07 00:22:26 +00001156 # pid but no running process - maybe process *just* exited
1157 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001158 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001159 # autoserv exited without writing an exit code
1160 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001161 self._handle_pidfile_error(
1162 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001163
showard21baa452008-10-21 00:08:39 +00001164
1165 def _get_pidfile_info(self):
1166 """\
1167 After completion, self._state will contain:
1168 pid=None, exit_status=None if autoserv has not yet run
1169 pid!=None, exit_status=None if autoserv is running
1170 pid!=None, exit_status!=None if autoserv has completed
1171 """
1172 try:
1173 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001174 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001175 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001176
1177
showard170873e2009-01-07 00:22:26 +00001178 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001179 """\
1180 Called when no pidfile is found or no pid is in the pidfile.
1181 """
showard170873e2009-01-07 00:22:26 +00001182 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001183 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001184 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001185 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001186 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001187
1188
showard35162b02009-03-03 02:17:30 +00001189 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001190 """\
1191 Called when autoserv has exited without writing an exit status,
1192 or we've timed out waiting for autoserv to write a pid to the
1193 pidfile. In either case, we just return failure and the caller
1194 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001195
showard170873e2009-01-07 00:22:26 +00001196 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001197 """
1198 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001199 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001200 self._state.exit_status = 1
1201 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001202
1203
jadmanski0afbb632008-06-06 21:10:57 +00001204 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001205 self._get_pidfile_info()
1206 return self._state.exit_status
1207
1208
1209 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001210 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001211 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001212 if self._state.num_tests_failed is None:
1213 return -1
showard21baa452008-10-21 00:08:39 +00001214 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001215
1216
showardcdaeae82009-08-31 18:32:48 +00001217 def try_copy_results_on_drone(self, **kwargs):
1218 if self.has_process():
1219 # copy results logs into the normal place for job results
1220 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1221
1222
1223 def try_copy_to_results_repository(self, source, **kwargs):
1224 if self.has_process():
1225 _drone_manager.copy_to_results_repository(self.get_process(),
1226 source, **kwargs)
1227
1228
mbligh36768f02008-02-22 18:28:33 +00001229class Agent(object):
showard77182562009-06-10 00:16:05 +00001230 """
Alex Miller47715eb2013-07-24 03:34:01 -07001231 An agent for use by the Dispatcher class to perform a task. An agent wraps
1232 around an AgentTask mainly to associate the AgentTask with the queue_entry
1233 and host ids.
showard77182562009-06-10 00:16:05 +00001234
1235 The following methods are required on all task objects:
1236 poll() - Called periodically to let the task check its status and
1237 update its internal state. If the task succeeded.
1238 is_done() - Returns True if the task is finished.
1239 abort() - Called when an abort has been requested. The task must
1240 set its aborted attribute to True if it actually aborted.
1241
1242 The following attributes are required on all task objects:
1243 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001244 success - bool, True if this task succeeded.
1245 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1246 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001247 """
1248
1249
showard418785b2009-11-23 20:19:59 +00001250 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001251 """
Alex Miller47715eb2013-07-24 03:34:01 -07001252 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001253 """
showard8cc058f2009-09-08 16:26:33 +00001254 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001255
showard77182562009-06-10 00:16:05 +00001256 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001257 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001258
showard8cc058f2009-09-08 16:26:33 +00001259 self.queue_entry_ids = task.queue_entry_ids
1260 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001261
showard8cc058f2009-09-08 16:26:33 +00001262 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001263 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001264
1265
jadmanski0afbb632008-06-06 21:10:57 +00001266 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001267 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001268 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001269 self.task.poll()
1270 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001271 self.finished = True
showardec113162008-05-08 00:52:49 +00001272
1273
jadmanski0afbb632008-06-06 21:10:57 +00001274 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001275 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001276
1277
showardd3dc1992009-04-22 21:01:40 +00001278 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001279 if self.task:
1280 self.task.abort()
1281 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001282 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001283 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001284
showardd3dc1992009-04-22 21:01:40 +00001285
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001286class BaseAgentTask(object):
showardd1195652009-12-08 22:21:02 +00001287 class _NullMonitor(object):
1288 pidfile_id = None
1289
1290 def has_process(self):
1291 return True
1292
1293
1294 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001295 """
showardd1195652009-12-08 22:21:02 +00001296 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001297 """
jadmanski0afbb632008-06-06 21:10:57 +00001298 self.done = False
showardd1195652009-12-08 22:21:02 +00001299 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001300 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001301 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001302 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001303 self.queue_entry_ids = []
1304 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001305 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001306
1307
1308 def _set_ids(self, host=None, queue_entries=None):
1309 if queue_entries and queue_entries != [None]:
1310 self.host_ids = [entry.host.id for entry in queue_entries]
1311 self.queue_entry_ids = [entry.id for entry in queue_entries]
1312 else:
1313 assert host
1314 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001315
1316
jadmanski0afbb632008-06-06 21:10:57 +00001317 def poll(self):
showard08a36412009-05-05 01:01:13 +00001318 if not self.started:
1319 self.start()
showardd1195652009-12-08 22:21:02 +00001320 if not self.done:
1321 self.tick()
showard08a36412009-05-05 01:01:13 +00001322
1323
1324 def tick(self):
showardd1195652009-12-08 22:21:02 +00001325 assert self.monitor
1326 exit_code = self.monitor.exit_code()
1327 if exit_code is None:
1328 return
mbligh36768f02008-02-22 18:28:33 +00001329
showardd1195652009-12-08 22:21:02 +00001330 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001331 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001332
1333
jadmanski0afbb632008-06-06 21:10:57 +00001334 def is_done(self):
1335 return self.done
mbligh36768f02008-02-22 18:28:33 +00001336
1337
jadmanski0afbb632008-06-06 21:10:57 +00001338 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001339 if self.done:
showardd1195652009-12-08 22:21:02 +00001340 assert self.started
showard08a36412009-05-05 01:01:13 +00001341 return
showardd1195652009-12-08 22:21:02 +00001342 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001343 self.done = True
1344 self.success = success
1345 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001346
1347
jadmanski0afbb632008-06-06 21:10:57 +00001348 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001349 """
1350 To be overridden.
1351 """
showarded2afea2009-07-07 20:54:07 +00001352 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001353 self.register_necessary_pidfiles()
1354
1355
1356 def _log_file(self):
1357 if not self._log_file_name:
1358 return None
1359 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001360
mbligh36768f02008-02-22 18:28:33 +00001361
jadmanski0afbb632008-06-06 21:10:57 +00001362 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001363 log_file = self._log_file()
1364 if self.monitor and log_file:
1365 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001366
1367
jadmanski0afbb632008-06-06 21:10:57 +00001368 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001369 """
1370 To be overridden.
1371 """
jadmanski0afbb632008-06-06 21:10:57 +00001372 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001373 logging.info("%s finished with success=%s", type(self).__name__,
1374 self.success)
1375
mbligh36768f02008-02-22 18:28:33 +00001376
jadmanski0afbb632008-06-06 21:10:57 +00001377 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001378 if not self.started:
1379 self.prolog()
1380 self.run()
1381
1382 self.started = True
1383
1384
1385 def abort(self):
1386 if self.monitor:
1387 self.monitor.kill()
1388 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001389 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001390 self.cleanup()
Dan Shid0e09ab2013-09-09 15:28:55 -07001391 if self.started:
1392 self.finished(success=False)
jadmanski0afbb632008-06-06 21:10:57 +00001393
1394
showarded2afea2009-07-07 20:54:07 +00001395 def _get_consistent_execution_path(self, execution_entries):
1396 first_execution_path = execution_entries[0].execution_path()
1397 for execution_entry in execution_entries[1:]:
1398 assert execution_entry.execution_path() == first_execution_path, (
1399 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1400 execution_entry,
1401 first_execution_path,
1402 execution_entries[0]))
1403 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001404
1405
showarded2afea2009-07-07 20:54:07 +00001406 def _copy_results(self, execution_entries, use_monitor=None):
1407 """
1408 @param execution_entries: list of objects with execution_path() method
1409 """
showard6d1c1432009-08-20 23:30:39 +00001410 if use_monitor is not None and not use_monitor.has_process():
1411 return
1412
showarded2afea2009-07-07 20:54:07 +00001413 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001414 if use_monitor is None:
1415 assert self.monitor
1416 use_monitor = self.monitor
1417 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001418 execution_path = self._get_consistent_execution_path(execution_entries)
1419 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001420 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001421
showarda1e74b32009-05-12 17:32:04 +00001422
1423 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001424 for queue_entry in queue_entries:
1425 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001426
1427
mbligh4608b002010-01-05 18:22:35 +00001428 def _archive_results(self, queue_entries):
1429 for queue_entry in queue_entries:
1430 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001431
1432
showardd1195652009-12-08 22:21:02 +00001433 def _command_line(self):
1434 """
1435 Return the command line to run. Must be overridden.
1436 """
1437 raise NotImplementedError
1438
1439
1440 @property
1441 def num_processes(self):
1442 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001443 Return the number of processes forked by this BaseAgentTask's process.
1444 It may only be approximate. To be overridden if necessary.
showardd1195652009-12-08 22:21:02 +00001445 """
1446 return 1
1447
1448
1449 def _paired_with_monitor(self):
1450 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001451 If this BaseAgentTask's process must run on the same machine as some
showardd1195652009-12-08 22:21:02 +00001452 previous process, this method should be overridden to return a
1453 PidfileRunMonitor for that process.
1454 """
1455 return self._NullMonitor()
1456
1457
1458 @property
1459 def owner_username(self):
1460 """
1461 Return login of user responsible for this task. May be None. Must be
1462 overridden.
1463 """
1464 raise NotImplementedError
1465
1466
1467 def _working_directory(self):
1468 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001469 Return the directory where this BaseAgentTask's process executes.
1470 Must be overridden.
showardd1195652009-12-08 22:21:02 +00001471 """
1472 raise NotImplementedError
1473
1474
1475 def _pidfile_name(self):
1476 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001477 Return the name of the pidfile this BaseAgentTask's process uses. To be
showardd1195652009-12-08 22:21:02 +00001478 overridden if necessary.
1479 """
jamesrenc44ae992010-02-19 00:12:54 +00001480 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001481
1482
1483 def _check_paired_results_exist(self):
1484 if not self._paired_with_monitor().has_process():
1485 email_manager.manager.enqueue_notify_email(
1486 'No paired results in task',
1487 'No paired results in task %s at %s'
1488 % (self, self._paired_with_monitor().pidfile_id))
1489 self.finished(False)
1490 return False
1491 return True
1492
1493
1494 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001495 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001496 self.monitor = PidfileRunMonitor()
1497
1498
1499 def run(self):
1500 if not self._check_paired_results_exist():
1501 return
1502
1503 self._create_monitor()
1504 self.monitor.run(
1505 self._command_line(), self._working_directory(),
1506 num_processes=self.num_processes,
1507 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1508 pidfile_name=self._pidfile_name(),
1509 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001510 username=self.owner_username,
1511 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1512
1513
1514 def get_drone_hostnames_allowed(self):
1515 if not models.DroneSet.drone_sets_enabled():
1516 return None
1517
1518 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1519 if not hqes:
1520 # Only special tasks could be missing host queue entries
1521 assert isinstance(self, SpecialAgentTask)
1522 return self._user_or_global_default_drone_set(
1523 self.task, self.task.requested_by)
1524
1525 job_ids = hqes.values_list('job', flat=True).distinct()
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001526 assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
jamesren76fcf192010-04-21 20:39:50 +00001527 "span multiple jobs")
1528
1529 job = models.Job.objects.get(id=job_ids[0])
1530 drone_set = job.drone_set
1531 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001532 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001533
1534 return drone_set.get_drone_hostnames()
1535
1536
1537 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1538 """
1539 Returns the user's default drone set, if present.
1540
1541 Otherwise, returns the global default drone set.
1542 """
1543 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1544 if not user:
1545 logging.warn('%s had no owner; using default drone set',
1546 obj_with_owner)
1547 return default_hostnames
1548 if not user.drone_set:
1549 logging.warn('User %s has no default drone set, using global '
1550 'default', user.login)
1551 return default_hostnames
1552 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001553
1554
1555 def register_necessary_pidfiles(self):
1556 pidfile_id = _drone_manager.get_pidfile_id_from(
1557 self._working_directory(), self._pidfile_name())
1558 _drone_manager.register_pidfile(pidfile_id)
1559
1560 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1561 if paired_pidfile_id:
1562 _drone_manager.register_pidfile(paired_pidfile_id)
1563
1564
1565 def recover(self):
1566 if not self._check_paired_results_exist():
1567 return
1568
1569 self._create_monitor()
1570 self.monitor.attach_to_existing_process(
1571 self._working_directory(), pidfile_name=self._pidfile_name(),
1572 num_processes=self.num_processes)
1573 if not self.monitor.has_process():
1574 # no process to recover; wait to be started normally
1575 self.monitor = None
1576 return
1577
1578 self.started = True
Aviv Keshet225bdfe2013-03-05 10:10:08 -08001579 logging.info('Recovering process %s for %s at %s',
1580 self.monitor.get_process(), type(self).__name__,
1581 self._working_directory())
mbligh36768f02008-02-22 18:28:33 +00001582
1583
mbligh4608b002010-01-05 18:22:35 +00001584 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1585 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001586 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001587 for entry in queue_entries:
1588 if entry.status not in allowed_hqe_statuses:
Dale Curtisaa513362011-03-01 17:27:44 -08001589 raise host_scheduler.SchedulerError(
1590 '%s attempting to start entry with invalid status %s: '
1591 '%s' % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001592 invalid_host_status = (
1593 allowed_host_statuses is not None
1594 and entry.host.status not in allowed_host_statuses)
1595 if invalid_host_status:
Dale Curtisaa513362011-03-01 17:27:44 -08001596 raise host_scheduler.SchedulerError(
1597 '%s attempting to start on queue entry with invalid '
1598 'host status %s: %s'
1599 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001600
1601
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001602SiteAgentTask = utils.import_site_class(
1603 __file__, 'autotest_lib.scheduler.site_monitor_db',
1604 'SiteAgentTask', BaseAgentTask)
1605
1606class AgentTask(SiteAgentTask):
1607 pass
1608
1609
showardd9205182009-04-27 20:09:55 +00001610class TaskWithJobKeyvals(object):
1611 """AgentTask mixin providing functionality to help with job keyval files."""
1612 _KEYVAL_FILE = 'keyval'
1613 def _format_keyval(self, key, value):
1614 return '%s=%s' % (key, value)
1615
1616
1617 def _keyval_path(self):
1618 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001619 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001620
1621
1622 def _write_keyval_after_job(self, field, value):
1623 assert self.monitor
1624 if not self.monitor.has_process():
1625 return
1626 _drone_manager.write_lines_to_file(
1627 self._keyval_path(), [self._format_keyval(field, value)],
1628 paired_with_process=self.monitor.get_process())
1629
1630
1631 def _job_queued_keyval(self, job):
1632 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1633
1634
1635 def _write_job_finished(self):
1636 self._write_keyval_after_job("job_finished", int(time.time()))
1637
1638
showarddb502762009-09-09 15:31:20 +00001639 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1640 keyval_contents = '\n'.join(self._format_keyval(key, value)
1641 for key, value in keyval_dict.iteritems())
1642 # always end with a newline to allow additional keyvals to be written
1643 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001644 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001645 keyval_contents,
1646 file_path=keyval_path)
1647
1648
1649 def _write_keyvals_before_job(self, keyval_dict):
1650 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1651
1652
1653 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001654 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001655 host.hostname)
1656 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001657 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001658 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1659 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1660
1661
Alex Millerc23a7f02013-08-27 17:36:42 -07001662class SelfThrottledTask(AgentTask):
1663 """
1664 Special AgentTask subclass that maintains its own global process limit.
1665 """
1666 _num_running_processes = 0
Fang Deng7bcc0982013-09-05 15:26:57 -07001667 # Last known limit of max processes, used to check whether
1668 # max processes config has been changed.
1669 _last_known_max_processes = 0
1670 # Whether an email should be sent to notifiy process limit being hit.
1671 _notification_on = True
1672 # Once process limit is hit, an email will be sent.
1673 # To prevent spams, do not send another email until
1674 # it drops to lower than the following level.
1675 REVIVE_NOTIFICATION_THRESHOLD = 0.80
Alex Millerc23a7f02013-08-27 17:36:42 -07001676
1677
1678 @classmethod
1679 def _increment_running_processes(cls):
1680 cls._num_running_processes += 1
Dan Shid0e09ab2013-09-09 15:28:55 -07001681 stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__,
1682 cls._num_running_processes)
Alex Millerc23a7f02013-08-27 17:36:42 -07001683
1684
1685 @classmethod
1686 def _decrement_running_processes(cls):
1687 cls._num_running_processes -= 1
Dan Shid0e09ab2013-09-09 15:28:55 -07001688 stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__,
1689 cls._num_running_processes)
Alex Millerc23a7f02013-08-27 17:36:42 -07001690
1691
1692 @classmethod
1693 def _max_processes(cls):
1694 raise NotImplementedError
1695
1696
1697 @classmethod
1698 def _can_run_new_process(cls):
1699 return cls._num_running_processes < cls._max_processes()
1700
1701
1702 def _process_started(self):
1703 return bool(self.monitor)
1704
1705
1706 def tick(self):
1707 # override tick to keep trying to start until the process count goes
1708 # down and we can, at which point we revert to default behavior
1709 if self._process_started():
1710 super(SelfThrottledTask, self).tick()
1711 else:
1712 self._try_starting_process()
1713
1714
1715 def run(self):
1716 # override run() to not actually run unless we can
1717 self._try_starting_process()
1718
1719
Fang Deng7bcc0982013-09-05 15:26:57 -07001720 @classmethod
1721 def _notify_process_limit_hit(cls):
1722 """Send an email to notify that process limit is hit."""
1723 if cls._notification_on:
1724 subject = '%s: hitting max process limit.' % cls.__name__
1725 message = ('Running processes/Max processes: %d/%d'
1726 % (cls._num_running_processes, cls._max_processes()))
1727 email_manager.manager.enqueue_notify_email(
1728 subject, message)
1729 cls._notification_on = False
1730
1731
1732 @classmethod
1733 def _reset_notification_switch_if_necessary(cls):
1734 """Reset _notification_on if necessary.
1735
1736 Set _notification_on to True on the following cases:
1737 1) If the limit of max processes configuration changes;
1738 2) If _notification_on is False and the number of running processes
1739 drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD.
1740
1741 """
1742 if cls._last_known_max_processes != cls._max_processes():
1743 cls._notification_on = True
1744 cls._last_known_max_processes = cls._max_processes()
1745 return
1746 percentage = float(cls._num_running_processes) / cls._max_processes()
1747 if (not cls._notification_on and
1748 percentage < cls.REVIVE_NOTIFICATION_THRESHOLD):
1749 cls._notification_on = True
1750
Alex Millerc23a7f02013-08-27 17:36:42 -07001751 def _try_starting_process(self):
Fang Deng7bcc0982013-09-05 15:26:57 -07001752 self._reset_notification_switch_if_necessary()
Alex Millerc23a7f02013-08-27 17:36:42 -07001753 if not self._can_run_new_process():
Fang Deng7bcc0982013-09-05 15:26:57 -07001754 self._notify_process_limit_hit()
Alex Millerc23a7f02013-08-27 17:36:42 -07001755 return
1756
1757 # actually run the command
1758 super(SelfThrottledTask, self).run()
1759 if self._process_started():
1760 self._increment_running_processes()
1761
1762
1763 def finished(self, success):
1764 super(SelfThrottledTask, self).finished(success)
1765 if self._process_started():
1766 self._decrement_running_processes()
1767
1768
showard8cc058f2009-09-08 16:26:33 +00001769class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001770 """
1771 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1772 """
1773
1774 TASK_TYPE = None
1775 host = None
1776 queue_entry = None
1777
showardd1195652009-12-08 22:21:02 +00001778 def __init__(self, task, extra_command_args):
1779 super(SpecialAgentTask, self).__init__()
1780
lmrb7c5d272010-04-16 06:34:04 +00001781 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001782
jamesrenc44ae992010-02-19 00:12:54 +00001783 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001784 self.queue_entry = None
1785 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001786 self.queue_entry = scheduler_models.HostQueueEntry(
1787 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001788
showarded2afea2009-07-07 20:54:07 +00001789 self.task = task
1790 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001791
1792
showard8cc058f2009-09-08 16:26:33 +00001793 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001794 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1795
1796
1797 def _command_line(self):
1798 return _autoserv_command_line(self.host.hostname,
1799 self._extra_command_args,
1800 queue_entry=self.queue_entry)
1801
1802
1803 def _working_directory(self):
1804 return self.task.execution_path()
1805
1806
1807 @property
1808 def owner_username(self):
1809 if self.task.requested_by:
1810 return self.task.requested_by.login
1811 return None
showard8cc058f2009-09-08 16:26:33 +00001812
1813
showarded2afea2009-07-07 20:54:07 +00001814 def prolog(self):
1815 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001816 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001817 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001818
1819
showardde634ee2009-01-30 01:44:24 +00001820 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001821 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001822
showard2fe3f1d2009-07-06 20:19:11 +00001823 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001824 return # don't fail metahost entries, they'll be reassigned
1825
showard2fe3f1d2009-07-06 20:19:11 +00001826 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001827 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001828 return # entry has been aborted
1829
Alex Millerdfff2fd2013-05-28 13:05:06 -07001830 self._actually_fail_queue_entry()
1831
1832
1833 # TODO(milleral): http://crbug.com/268607
1834 # All this used to be a part of _fail_queue_entry. The
1835 # exact semantics of when one should and should not be failing a queue
1836 # entry need to be worked out, because provisioning has placed us in a
1837 # case where we want to fail a queue entry that could be requeued,
1838 # which makes us fail the two above if statements, and thus
1839 # _fail_queue_entry() would exit early and have no effect.
1840 # What's left here with _actually_fail_queue_entry is a hack to be able to
1841 # bypass the checks and unconditionally execute the code.
1842 def _actually_fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001843 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001844 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001845 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001846 self._write_keyval_after_job(queued_key, queued_time)
1847 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001848
showard8cc058f2009-09-08 16:26:33 +00001849 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001850 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001851 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001852 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001853
showard8cc058f2009-09-08 16:26:33 +00001854 pidfile_id = _drone_manager.get_pidfile_id_from(
1855 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001856 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001857 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001858
1859 if self.queue_entry.job.parse_failed_repair:
1860 self._parse_results([self.queue_entry])
1861 else:
1862 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001863
Alex Miller23676a22013-07-03 09:03:36 -07001864 # Also fail all other special tasks that have not yet run for this HQE
1865 pending_tasks = models.SpecialTask.objects.filter(
1866 queue_entry__id=self.queue_entry.id,
1867 is_complete=0)
Alex Miller5e36ccc2013-08-03 16:31:58 -07001868 for task in pending_tasks:
1869 task.finish(False)
Alex Miller23676a22013-07-03 09:03:36 -07001870
showard8cc058f2009-09-08 16:26:33 +00001871
1872 def cleanup(self):
1873 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001874
1875 # We will consider an aborted task to be "Failed"
1876 self.task.finish(bool(self.success))
1877
showardf85a0b72009-10-07 20:48:45 +00001878 if self.monitor:
1879 if self.monitor.has_process():
1880 self._copy_results([self.task])
1881 if self.monitor.pidfile_id is not None:
1882 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001883
1884
Dan Shi07e09af2013-04-12 09:31:29 -07001885 def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
1886 """Remove a type of special task in all tasks, keep last one if needed.
1887
1888 @param special_task_to_remove: type of special task to be removed, e.g.,
1889 models.SpecialTask.Task.VERIFY.
1890 @param keep_last_one: True to keep the last special task if its type is
1891 the same as of special_task_to_remove.
1892
1893 """
1894 queued_special_tasks = models.SpecialTask.objects.filter(
1895 host__id=self.host.id,
1896 task=special_task_to_remove,
1897 is_active=False, is_complete=False, queue_entry=None)
1898 if keep_last_one:
1899 queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
1900 queued_special_tasks.delete()
1901
1902
showard8cc058f2009-09-08 16:26:33 +00001903class RepairTask(SpecialAgentTask):
1904 TASK_TYPE = models.SpecialTask.Task.REPAIR
1905
1906
showardd1195652009-12-08 22:21:02 +00001907 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001908 """\
1909 queue_entry: queue entry to mark failed if this repair fails.
1910 """
1911 protection = host_protections.Protection.get_string(
1912 task.host.protection)
1913 # normalize the protection name
1914 protection = host_protections.Protection.get_attr_name(protection)
1915
1916 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001917 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001918
1919 # *don't* include the queue entry in IDs -- if the queue entry is
1920 # aborted, we want to leave the repair task running
1921 self._set_ids(host=self.host)
1922
1923
1924 def prolog(self):
1925 super(RepairTask, self).prolog()
1926 logging.info("repair_task starting")
1927 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001928
1929
jadmanski0afbb632008-06-06 21:10:57 +00001930 def epilog(self):
1931 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001932
jadmanski0afbb632008-06-06 21:10:57 +00001933 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001934 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001935 else:
showard8cc058f2009-09-08 16:26:33 +00001936 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001937 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001938 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001939
1940
showarded2afea2009-07-07 20:54:07 +00001941class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001942 def _copy_to_results_repository(self):
1943 if not self.queue_entry or self.queue_entry.meta_host:
1944 return
1945
1946 self.queue_entry.set_execution_subdir()
1947 log_name = os.path.basename(self.task.execution_path())
1948 source = os.path.join(self.task.execution_path(), 'debug',
1949 'autoserv.DEBUG')
1950 destination = os.path.join(
1951 self.queue_entry.execution_path(), log_name)
1952
1953 self.monitor.try_copy_to_results_repository(
1954 source, destination_path=destination)
1955
1956
showard170873e2009-01-07 00:22:26 +00001957 def epilog(self):
1958 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001959
showard775300b2009-09-09 15:30:50 +00001960 if self.success:
1961 return
showard8fe93b52008-11-18 17:53:22 +00001962
showard775300b2009-09-09 15:30:50 +00001963 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001964 # effectively ignore failure for these hosts
1965 self.success = True
showard775300b2009-09-09 15:30:50 +00001966 return
1967
1968 if self.queue_entry:
Alex Millerf3f19452013-07-29 15:53:00 -07001969 # If we requeue a HQE, we should cancel any remaining pre-job
1970 # tasks against this host, otherwise we'll be left in a state
1971 # where a queued HQE has special tasks to run against a host.
1972 models.SpecialTask.objects.filter(
1973 queue_entry__id=self.queue_entry.id,
1974 host__id=self.host.id,
1975 is_complete=0).update(is_complete=1, success=0)
showard775300b2009-09-09 15:30:50 +00001976
Alex Millera4a78ef2013-09-03 21:23:05 -07001977 previous_provisions = models.SpecialTask.objects.filter(
1978 task=models.SpecialTask.Task.PROVISION,
1979 queue_entry_id=self.queue_entry.id).count()
Alex Miller7bcec082013-09-19 10:00:53 -07001980 if (previous_provisions >
Alex Millera4a78ef2013-09-03 21:23:05 -07001981 scheduler_config.config.max_provision_retries):
1982 self._actually_fail_queue_entry()
1983 # This abort will mark the aborted bit on the HQE itself, to
1984 # signify that we're killing it. Technically it also will do
1985 # the recursive aborting of all child jobs, but that shouldn't
1986 # matter here, as only suites have children, and those are
1987 # hostless and thus don't have provisioning.
1988 # TODO(milleral) http://crbug.com/188217
1989 # However, we can't actually do this yet, as if we set the
1990 # abort bit the FinalReparseTask will set the status of the HQE
1991 # to ABORTED, which then means that we don't show the status in
1992 # run_suite. So in the meantime, don't mark the HQE as
1993 # aborted.
1994 # queue_entry.abort()
1995 else:
1996 # requeue() must come after handling provision retries, since
1997 # _actually_fail_queue_entry needs an execution subdir.
1998 # We also don't want to requeue if we hit the provision retry
1999 # limit, since then we overwrite the PARSING state of the HQE.
2000 self.queue_entry.requeue()
2001
2002 previous_repairs = models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00002003 task=models.SpecialTask.Task.REPAIR,
Alex Millera4a78ef2013-09-03 21:23:05 -07002004 queue_entry_id=self.queue_entry.id).count()
2005 if previous_repairs >= scheduler_config.config.max_repair_limit:
showard775300b2009-09-09 15:30:50 +00002006 self.host.set_status(models.Host.Status.REPAIR_FAILED)
2007 self._fail_queue_entry()
2008 return
2009
showard9bb960b2009-11-19 01:02:11 +00002010 queue_entry = models.HostQueueEntry.objects.get(
2011 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00002012 else:
2013 queue_entry = None
2014
2015 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00002016 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00002017 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00002018 queue_entry=queue_entry,
2019 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00002020
showard8fe93b52008-11-18 17:53:22 +00002021
Alex Miller42437f92013-05-28 12:58:54 -07002022 def _should_pending(self):
2023 """
2024 Decide if we should call the host queue entry's on_pending method.
2025 We should if:
2026 1) There exists an associated host queue entry.
2027 2) The current special task completed successfully.
2028 3) There do not exist any more special tasks to be run before the
2029 host queue entry starts.
2030
2031 @returns: True if we should call pending, false if not.
2032
2033 """
2034 if not self.queue_entry or not self.success:
2035 return False
2036
2037 # We know if this is the last one when we create it, so we could add
2038 # another column to the database to keep track of this information, but
2039 # I expect the overhead of querying here to be minimal.
2040 queue_entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
2041 queued = models.SpecialTask.objects.filter(
2042 host__id=self.host.id, is_active=False,
2043 is_complete=False, queue_entry=queue_entry)
2044 queued = queued.exclude(id=self.task.id)
2045 return queued.count() == 0
2046
2047
showard8fe93b52008-11-18 17:53:22 +00002048class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002049 TASK_TYPE = models.SpecialTask.Task.VERIFY
2050
2051
showardd1195652009-12-08 22:21:02 +00002052 def __init__(self, task):
2053 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00002054 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00002055
2056
jadmanski0afbb632008-06-06 21:10:57 +00002057 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002058 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00002059
showardb18134f2009-03-20 20:52:18 +00002060 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002061 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00002062 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2063 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00002064
jamesren42318f72010-05-10 23:40:59 +00002065 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00002066 # and there's no need to keep records of other requests.
Dan Shi07e09af2013-04-12 09:31:29 -07002067 self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
2068 keep_last_one=True)
showard2fe3f1d2009-07-06 20:19:11 +00002069
mbligh36768f02008-02-22 18:28:33 +00002070
jadmanski0afbb632008-06-06 21:10:57 +00002071 def epilog(self):
2072 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002073 if self.success:
Alex Miller42437f92013-05-28 12:58:54 -07002074 if self._should_pending():
showard8cc058f2009-09-08 16:26:33 +00002075 self.queue_entry.on_pending()
2076 else:
2077 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00002078
2079
mbligh4608b002010-01-05 18:22:35 +00002080class CleanupTask(PreJobTask):
2081 # note this can also run post-job, but when it does, it's running standalone
2082 # against the host (not related to the job), so it's not considered a
2083 # PostJobTask
2084
2085 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2086
2087
2088 def __init__(self, task, recover_run_monitor=None):
2089 super(CleanupTask, self).__init__(task, ['--cleanup'])
2090 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2091
2092
2093 def prolog(self):
2094 super(CleanupTask, self).prolog()
2095 logging.info("starting cleanup task for host: %s", self.host.hostname)
2096 self.host.set_status(models.Host.Status.CLEANING)
2097 if self.queue_entry:
Dan Shi07e09af2013-04-12 09:31:29 -07002098 self.queue_entry.set_status(models.HostQueueEntry.Status.CLEANING)
mbligh4608b002010-01-05 18:22:35 +00002099
2100
2101 def _finish_epilog(self):
2102 if not self.queue_entry or not self.success:
2103 return
2104
2105 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2106 should_run_verify = (
2107 self.queue_entry.job.run_verify
2108 and self.host.protection != do_not_verify_protection)
2109 if should_run_verify:
2110 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
2111 models.SpecialTask.objects.create(
2112 host=models.Host.objects.get(id=self.host.id),
2113 queue_entry=entry,
2114 task=models.SpecialTask.Task.VERIFY)
2115 else:
Alex Miller42437f92013-05-28 12:58:54 -07002116 if self._should_pending():
2117 self.queue_entry.on_pending()
mbligh4608b002010-01-05 18:22:35 +00002118
2119
2120 def epilog(self):
2121 super(CleanupTask, self).epilog()
2122
2123 if self.success:
2124 self.host.update_field('dirty', 0)
2125 self.host.set_status(models.Host.Status.READY)
2126
2127 self._finish_epilog()
2128
2129
Dan Shi07e09af2013-04-12 09:31:29 -07002130class ResetTask(PreJobTask):
2131 """Task to reset a DUT, including cleanup and verify."""
2132 # note this can also run post-job, but when it does, it's running standalone
2133 # against the host (not related to the job), so it's not considered a
2134 # PostJobTask
2135
2136 TASK_TYPE = models.SpecialTask.Task.RESET
2137
2138
2139 def __init__(self, task, recover_run_monitor=None):
2140 super(ResetTask, self).__init__(task, ['--reset'])
2141 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2142
2143
2144 def prolog(self):
2145 super(ResetTask, self).prolog()
2146 logging.info('starting reset task for host: %s',
2147 self.host.hostname)
2148 self.host.set_status(models.Host.Status.RESETTING)
2149 if self.queue_entry:
2150 self.queue_entry.set_status(models.HostQueueEntry.Status.RESETTING)
2151
2152 # Delete any queued cleanups for this host.
2153 self.remove_special_tasks(models.SpecialTask.Task.CLEANUP,
2154 keep_last_one=False)
2155
2156 # Delete any queued reverifies for this host.
2157 self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
2158 keep_last_one=False)
2159
2160 # Only one reset is needed.
2161 self.remove_special_tasks(models.SpecialTask.Task.RESET,
2162 keep_last_one=True)
2163
2164
2165 def epilog(self):
2166 super(ResetTask, self).epilog()
2167
2168 if self.success:
2169 self.host.update_field('dirty', 0)
Dan Shi07e09af2013-04-12 09:31:29 -07002170
Alex Millerba076c52013-07-11 10:11:48 -07002171 if self._should_pending():
Dan Shi07e09af2013-04-12 09:31:29 -07002172 self.queue_entry.on_pending()
Alex Millerdc608d52013-07-30 14:26:21 -07002173 else:
2174 self.host.set_status(models.Host.Status.READY)
Dan Shi07e09af2013-04-12 09:31:29 -07002175
2176
Alex Millerdfff2fd2013-05-28 13:05:06 -07002177class ProvisionTask(PreJobTask):
2178 TASK_TYPE = models.SpecialTask.Task.PROVISION
2179
2180 def __init__(self, task):
2181 # Provisioning requires that we be associated with a job/queue entry
2182 assert task.queue_entry, "No HQE associated with provision task!"
2183 # task.queue_entry is an afe model HostQueueEntry object.
2184 # self.queue_entry is a scheduler models HostQueueEntry object, but
2185 # it gets constructed and assigned in __init__, so it's not available
2186 # yet. Therefore, we're stuck pulling labels off of the afe model
2187 # so that we can pass the --provision args into the __init__ call.
2188 labels = {x.name for x in task.queue_entry.job.dependency_labels.all()}
2189 _, provisionable = provision.filter_labels(labels)
2190 extra_command_args = ['--provision', ','.join(provisionable)]
2191 super(ProvisionTask, self).__init__(task, extra_command_args)
2192 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2193
2194
2195 def _command_line(self):
2196 # If we give queue_entry to _autoserv_command_line, then it will append
2197 # -c for this invocation if the queue_entry is a client side test. We
2198 # don't want that, as it messes with provisioning, so we just drop it
2199 # from the arguments here.
2200 # Note that we also don't verify job_repo_url as provisioining tasks are
2201 # required to stage whatever content we need, and the job itself will
2202 # force autotest to be staged if it isn't already.
2203 return _autoserv_command_line(self.host.hostname,
2204 self._extra_command_args)
2205
2206
2207 def prolog(self):
2208 super(ProvisionTask, self).prolog()
2209 # add check for previous provision task and abort if exist.
2210 logging.info("starting provision task for host: %s", self.host.hostname)
2211 self.queue_entry.set_status(
2212 models.HostQueueEntry.Status.PROVISIONING)
2213 self.host.set_status(models.Host.Status.PROVISIONING)
2214
2215
2216 def epilog(self):
Alex Millera4a78ef2013-09-03 21:23:05 -07002217 super(ProvisionTask, self).epilog()
Alex Millerdfff2fd2013-05-28 13:05:06 -07002218
Alex Millera4a78ef2013-09-03 21:23:05 -07002219 if self._should_pending():
Alex Millerdfff2fd2013-05-28 13:05:06 -07002220 self.queue_entry.on_pending()
2221 else:
2222 self.host.set_status(models.Host.Status.READY)
2223
2224
showarda9545c02009-12-18 22:44:26 +00002225class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
2226 """
2227 Common functionality for QueueTask and HostlessQueueTask
2228 """
2229 def __init__(self, queue_entries):
2230 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00002231 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00002232 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00002233
2234
showard73ec0442009-02-07 02:05:20 +00002235 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00002236 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00002237
2238
jamesrenc44ae992010-02-19 00:12:54 +00002239 def _write_control_file(self, execution_path):
2240 control_path = _drone_manager.attach_file_to_execution(
2241 execution_path, self.job.control_file)
2242 return control_path
2243
2244
Aviv Keshet308e7362013-05-21 14:43:16 -07002245 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00002246 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00002247 execution_path = self.queue_entries[0].execution_path()
2248 control_path = self._write_control_file(execution_path)
2249 hostnames = ','.join(entry.host.hostname
2250 for entry in self.queue_entries
2251 if not entry.is_hostless())
2252
2253 execution_tag = self.queue_entries[0].execution_tag()
2254 params = _autoserv_command_line(
2255 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07002256 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00002257 _drone_manager.absolute_path(control_path)],
2258 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07002259 if self.job.is_image_update_job():
2260 params += ['--image', self.job.update_image_path]
2261
jamesrenc44ae992010-02-19 00:12:54 +00002262 return params
showardd1195652009-12-08 22:21:02 +00002263
2264
2265 @property
2266 def num_processes(self):
2267 return len(self.queue_entries)
2268
2269
2270 @property
2271 def owner_username(self):
2272 return self.job.owner
2273
2274
2275 def _working_directory(self):
2276 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002277
2278
jadmanski0afbb632008-06-06 21:10:57 +00002279 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002280 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00002281 keyval_dict = self.job.keyval_dict()
2282 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00002283 group_name = self.queue_entries[0].get_group_name()
2284 if group_name:
2285 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002286 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002287 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002288 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002289 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002290
2291
showard35162b02009-03-03 02:17:30 +00002292 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002293 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002294 _drone_manager.write_lines_to_file(error_file_path,
2295 [_LOST_PROCESS_ERROR])
2296
2297
showardd3dc1992009-04-22 21:01:40 +00002298 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002299 if not self.monitor:
2300 return
2301
showardd9205182009-04-27 20:09:55 +00002302 self._write_job_finished()
2303
showard35162b02009-03-03 02:17:30 +00002304 if self.monitor.lost_process:
2305 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002306
jadmanskif7fa2cc2008-10-01 14:13:23 +00002307
showardcbd74612008-11-19 21:42:02 +00002308 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002309 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002310 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002311 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002312 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002313
2314
jadmanskif7fa2cc2008-10-01 14:13:23 +00002315 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002316 if not self.monitor or not self.monitor.has_process():
2317 return
2318
jadmanskif7fa2cc2008-10-01 14:13:23 +00002319 # build up sets of all the aborted_by and aborted_on values
2320 aborted_by, aborted_on = set(), set()
2321 for queue_entry in self.queue_entries:
2322 if queue_entry.aborted_by:
2323 aborted_by.add(queue_entry.aborted_by)
2324 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2325 aborted_on.add(t)
2326
2327 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00002328 # TODO(showard): this conditional is now obsolete, we just need to leave
2329 # it in temporarily for backwards compatibility over upgrades. delete
2330 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00002331 assert len(aborted_by) <= 1
2332 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002333 aborted_by_value = aborted_by.pop()
2334 aborted_on_value = max(aborted_on)
2335 else:
2336 aborted_by_value = 'autotest_system'
2337 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002338
showarda0382352009-02-11 23:36:43 +00002339 self._write_keyval_after_job("aborted_by", aborted_by_value)
2340 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002341
showardcbd74612008-11-19 21:42:02 +00002342 aborted_on_string = str(datetime.datetime.fromtimestamp(
2343 aborted_on_value))
2344 self._write_status_comment('Job aborted by %s on %s' %
2345 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002346
2347
jadmanski0afbb632008-06-06 21:10:57 +00002348 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002349 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002350 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002351 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002352
2353
jadmanski0afbb632008-06-06 21:10:57 +00002354 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002355 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002356 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002357
2358
2359class QueueTask(AbstractQueueTask):
2360 def __init__(self, queue_entries):
2361 super(QueueTask, self).__init__(queue_entries)
2362 self._set_ids(queue_entries=queue_entries)
2363
2364
2365 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002366 self._check_queue_entry_statuses(
2367 self.queue_entries,
2368 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2369 models.HostQueueEntry.Status.RUNNING),
2370 allowed_host_statuses=(models.Host.Status.PENDING,
2371 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002372
2373 super(QueueTask, self).prolog()
2374
2375 for queue_entry in self.queue_entries:
2376 self._write_host_keyvals(queue_entry.host)
2377 queue_entry.host.set_status(models.Host.Status.RUNNING)
2378 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00002379
2380
2381 def _finish_task(self):
2382 super(QueueTask, self)._finish_task()
2383
2384 for queue_entry in self.queue_entries:
2385 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00002386 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00002387
2388
Alex Miller9f01d5d2013-08-08 02:26:01 -07002389 def _command_line(self):
2390 invocation = super(QueueTask, self)._command_line()
2391 return invocation + ['--verify_job_repo_url']
2392
2393
Alex Millerc23a7f02013-08-27 17:36:42 -07002394class HostlessQueueTask(SelfThrottledTask, AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00002395 def __init__(self, queue_entry):
2396 super(HostlessQueueTask, self).__init__([queue_entry])
2397 self.queue_entry_ids = [queue_entry.id]
2398
2399
2400 def prolog(self):
2401 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2402 super(HostlessQueueTask, self).prolog()
2403
2404
mbligh4608b002010-01-05 18:22:35 +00002405 def _finish_task(self):
2406 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002407 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002408
2409
Alex Millerc23a7f02013-08-27 17:36:42 -07002410 @classmethod
2411 def _max_processes(cls):
2412 return scheduler_config.config.max_hostless_processes
2413
2414
showardd3dc1992009-04-22 21:01:40 +00002415class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002416 def __init__(self, queue_entries, log_file_name):
2417 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002418
showardd1195652009-12-08 22:21:02 +00002419 self.queue_entries = queue_entries
2420
showardd3dc1992009-04-22 21:01:40 +00002421 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002422 self._autoserv_monitor.attach_to_existing_process(
2423 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002424
showardd1195652009-12-08 22:21:02 +00002425
2426 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002427 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002428 return 'true'
2429 return self._generate_command(
2430 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002431
2432
2433 def _generate_command(self, results_dir):
2434 raise NotImplementedError('Subclasses must override this')
2435
2436
showardd1195652009-12-08 22:21:02 +00002437 @property
2438 def owner_username(self):
2439 return self.queue_entries[0].job.owner
2440
2441
2442 def _working_directory(self):
2443 return self._get_consistent_execution_path(self.queue_entries)
2444
2445
2446 def _paired_with_monitor(self):
2447 return self._autoserv_monitor
2448
2449
showardd3dc1992009-04-22 21:01:40 +00002450 def _job_was_aborted(self):
2451 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002452 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002453 queue_entry.update_from_database()
2454 if was_aborted is None: # first queue entry
2455 was_aborted = bool(queue_entry.aborted)
2456 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00002457 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
2458 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00002459 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00002460 'Inconsistent abort state',
2461 'Queue entries have inconsistent abort state:\n' +
2462 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00002463 # don't crash here, just assume true
2464 return True
2465 return was_aborted
2466
2467
showardd1195652009-12-08 22:21:02 +00002468 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002469 if self._job_was_aborted():
2470 return models.HostQueueEntry.Status.ABORTED
2471
2472 # we'll use a PidfileRunMonitor to read the autoserv exit status
2473 if self._autoserv_monitor.exit_code() == 0:
2474 return models.HostQueueEntry.Status.COMPLETED
2475 return models.HostQueueEntry.Status.FAILED
2476
2477
showardd3dc1992009-04-22 21:01:40 +00002478 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002479 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002480 queue_entry.set_status(status)
2481
2482
2483 def abort(self):
2484 # override AgentTask.abort() to avoid killing the process and ending
2485 # the task. post-job tasks continue when the job is aborted.
2486 pass
2487
2488
mbligh4608b002010-01-05 18:22:35 +00002489 def _pidfile_label(self):
2490 # '.autoserv_execute' -> 'autoserv'
2491 return self._pidfile_name()[1:-len('_execute')]
2492
2493
showard9bb960b2009-11-19 01:02:11 +00002494class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002495 """
2496 Task responsible for
2497 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2498 * copying logs to the results repository
2499 * spawning CleanupTasks for hosts, if necessary
2500 * spawning a FinalReparseTask for the job
2501 """
showardd1195652009-12-08 22:21:02 +00002502 def __init__(self, queue_entries, recover_run_monitor=None):
2503 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002504 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002505 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002506 self._set_ids(queue_entries=queue_entries)
2507
2508
Aviv Keshet308e7362013-05-21 14:43:16 -07002509 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd3dc1992009-04-22 21:01:40 +00002510 def _generate_command(self, results_dir):
2511 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002512 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002513 return [_autoserv_path , '-p',
2514 '--pidfile-label=%s' % self._pidfile_label(),
2515 '--use-existing-results', '--collect-crashinfo',
2516 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002517
2518
showardd1195652009-12-08 22:21:02 +00002519 @property
2520 def num_processes(self):
2521 return len(self.queue_entries)
2522
2523
2524 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002525 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002526
2527
showardd3dc1992009-04-22 21:01:40 +00002528 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002529 self._check_queue_entry_statuses(
2530 self.queue_entries,
2531 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2532 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002533
showardd3dc1992009-04-22 21:01:40 +00002534 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002535
2536
showardd3dc1992009-04-22 21:01:40 +00002537 def epilog(self):
2538 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002539 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002540 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002541
showard9bb960b2009-11-19 01:02:11 +00002542
2543 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002544 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002545 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002546 models.HostQueueEntry.Status.COMPLETED)
2547 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2548 else:
2549 final_success = False
2550 num_tests_failed = 0
showard9bb960b2009-11-19 01:02:11 +00002551 reboot_after = self._job.reboot_after
2552 do_reboot = (
2553 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002554 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002555 or reboot_after == model_attributes.RebootAfter.ALWAYS
2556 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
Dan Shi07e09af2013-04-12 09:31:29 -07002557 and final_success and num_tests_failed == 0)
2558 or num_tests_failed > 0)
showard9bb960b2009-11-19 01:02:11 +00002559
showardd1195652009-12-08 22:21:02 +00002560 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002561 if do_reboot:
2562 # don't pass the queue entry to the CleanupTask. if the cleanup
2563 # fails, the job doesn't care -- it's over.
2564 models.SpecialTask.objects.create(
2565 host=models.Host.objects.get(id=queue_entry.host.id),
2566 task=models.SpecialTask.Task.CLEANUP,
2567 requested_by=self._job.owner_model())
2568 else:
2569 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002570
2571
showard0bbfc212009-04-29 21:06:13 +00002572 def run(self):
showard597bfd32009-05-08 18:22:50 +00002573 autoserv_exit_code = self._autoserv_monitor.exit_code()
2574 # only run if Autoserv exited due to some signal. if we have no exit
2575 # code, assume something bad (and signal-like) happened.
2576 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002577 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002578 else:
2579 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002580
2581
Alex Millerc23a7f02013-08-27 17:36:42 -07002582class FinalReparseTask(SelfThrottledTask, PostJobTask):
showardd1195652009-12-08 22:21:02 +00002583 def __init__(self, queue_entries):
2584 super(FinalReparseTask, self).__init__(queue_entries,
2585 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002586 # don't use _set_ids, since we don't want to set the host_ids
2587 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002588
2589
2590 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002591 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002592 results_dir]
2593
2594
2595 @property
2596 def num_processes(self):
2597 return 0 # don't include parser processes in accounting
2598
2599
2600 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002601 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002602
2603
showard97aed502008-11-04 02:01:24 +00002604 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002605 def _max_processes(cls):
2606 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002607
2608
2609 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002610 self._check_queue_entry_statuses(
2611 self.queue_entries,
2612 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002613
showard97aed502008-11-04 02:01:24 +00002614 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002615
2616
2617 def epilog(self):
2618 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002619 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002620
2621
Alex Millerc23a7f02013-08-27 17:36:42 -07002622class ArchiveResultsTask(SelfThrottledTask, PostJobTask):
showarde1575b52010-01-15 00:21:12 +00002623 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2624
mbligh4608b002010-01-05 18:22:35 +00002625 def __init__(self, queue_entries):
2626 super(ArchiveResultsTask, self).__init__(queue_entries,
2627 log_file_name='.archiving.log')
2628 # don't use _set_ids, since we don't want to set the host_ids
2629 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002630
2631
mbligh4608b002010-01-05 18:22:35 +00002632 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002633 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002634
2635
Aviv Keshet308e7362013-05-21 14:43:16 -07002636 # TODO: Refactor into autoserv_utils. crbug.com/243090
mbligh4608b002010-01-05 18:22:35 +00002637 def _generate_command(self, results_dir):
2638 return [_autoserv_path , '-p',
2639 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002640 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002641 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2642 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002643
2644
mbligh4608b002010-01-05 18:22:35 +00002645 @classmethod
2646 def _max_processes(cls):
2647 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002648
2649
2650 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002651 self._check_queue_entry_statuses(
2652 self.queue_entries,
2653 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2654
2655 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002656
2657
mbligh4608b002010-01-05 18:22:35 +00002658 def epilog(self):
2659 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002660 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002661 failed_file = os.path.join(self._working_directory(),
2662 self._ARCHIVING_FAILED_FILE)
2663 paired_process = self._paired_with_monitor().get_process()
2664 _drone_manager.write_lines_to_file(
2665 failed_file, ['Archiving failed with exit code %s'
2666 % self.monitor.exit_code()],
2667 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002668 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002669
2670
mbligh36768f02008-02-22 18:28:33 +00002671if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002672 main()