blob: 1f9e6d561d1849bf6b5d8d02af1e3b6a4717eb79 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
mbligh36768f02008-02-22 18:28:33 +00008
Aviv Keshet225bdfe2013-03-05 10:10:08 -08009import datetime, optparse, os, signal
10import sys, time, traceback, urllib
11import logging, gc
showard402934a2009-12-21 22:20:47 +000012
Alex Miller05d7b4c2013-03-04 07:49:38 -080013import common
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000015
16import django.db
17
showard136e6dc2009-06-10 19:38:49 +000018from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000019from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000020from autotest_lib.database import database_connection
jamesrendd855242010-03-02 22:23:44 +000021from autotest_lib.frontend.afe import model_attributes
Alex Miller05d7b4c2013-03-04 07:49:38 -080022from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000023from autotest_lib.scheduler import drone_manager, drones, email_manager
Dale Curtisaa513362011-03-01 17:27:44 -080024from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
Alex Miller05d7b4c2013-03-04 07:49:38 -080025from autotest_lib.scheduler import scheduler_logging_config
jamesrenc44ae992010-02-19 00:12:54 +000026from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080027from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070028from autotest_lib.server import autoserv_utils
Alex Millerdfff2fd2013-05-28 13:05:06 -070029from autotest_lib.server.cros import provision
Fang Deng1d6c2a02013-04-17 15:25:45 -070030from autotest_lib.site_utils.graphite import stats
Alex Miller05d7b4c2013-03-04 07:49:38 -080031
showard549afad2009-08-20 23:33:36 +000032BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
33PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000034
mbligh36768f02008-02-22 18:28:33 +000035RESULTS_DIR = '.'
36AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000037DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000038AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
39
40if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000041 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000042AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
43AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
44
45if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000046 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000047
showard35162b02009-03-03 02:17:30 +000048# error message to leave in results dir when an autoserv process disappears
49# mysteriously
50_LOST_PROCESS_ERROR = """\
51Autoserv failed abnormally during execution for this job, probably due to a
52system error on the Autotest server. Full results may not be available. Sorry.
53"""
54
mbligh6f8bab42008-02-29 22:45:14 +000055_db = None
mbligh36768f02008-02-22 18:28:33 +000056_shutdown = False
Aviv Keshet308e7362013-05-21 14:43:16 -070057_autoserv_directory = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server')
58_autoserv_path = os.path.join(_autoserv_directory, 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000059_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000060_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000061
Eric Lie0493a42010-11-15 13:05:43 -080062def _parser_path_default(install_dir):
63 return os.path.join(install_dir, 'tko', 'parse')
64_parser_path_func = utils.import_site_function(
65 __file__, 'autotest_lib.scheduler.site_monitor_db',
66 'parser_path', _parser_path_default)
67_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
68
mbligh36768f02008-02-22 18:28:33 +000069
showardec6a3b92009-09-25 20:29:13 +000070def _get_pidfile_timeout_secs():
71 """@returns How long to wait for autoserv to write pidfile."""
72 pidfile_timeout_mins = global_config.global_config.get_config_value(
73 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
74 return pidfile_timeout_mins * 60
75
76
mbligh83c1e9e2009-05-01 23:10:41 +000077def _site_init_monitor_db_dummy():
78 return {}
79
80
jamesren76fcf192010-04-21 20:39:50 +000081def _verify_default_drone_set_exists():
82 if (models.DroneSet.drone_sets_enabled() and
83 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080084 raise host_scheduler.SchedulerError(
85 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000086
87
88def _sanity_check():
89 """Make sure the configs are consistent before starting the scheduler"""
90 _verify_default_drone_set_exists()
91
92
mbligh36768f02008-02-22 18:28:33 +000093def main():
showard27f33872009-04-07 18:20:53 +000094 try:
showard549afad2009-08-20 23:33:36 +000095 try:
96 main_without_exception_handling()
97 except SystemExit:
98 raise
99 except:
100 logging.exception('Exception escaping in monitor_db')
101 raise
102 finally:
103 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000104
105
106def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000107 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000108
showard136e6dc2009-06-10 19:38:49 +0000109 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000110 parser = optparse.OptionParser(usage)
111 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
112 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000113 parser.add_option('--test', help='Indicate that scheduler is under ' +
114 'test and should use dummy autoserv and no parsing',
115 action='store_true')
116 (options, args) = parser.parse_args()
117 if len(args) != 1:
118 parser.print_usage()
119 return
mbligh36768f02008-02-22 18:28:33 +0000120
showard5613c662009-06-08 23:30:33 +0000121 scheduler_enabled = global_config.global_config.get_config_value(
122 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
123
124 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800125 logging.error("Scheduler not enabled, set enable_scheduler to true in "
126 "the global_config's SCHEDULER section to enable it. "
127 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000128 sys.exit(1)
129
jadmanski0afbb632008-06-06 21:10:57 +0000130 global RESULTS_DIR
131 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000132
mbligh83c1e9e2009-05-01 23:10:41 +0000133 site_init = utils.import_site_function(__file__,
134 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
135 _site_init_monitor_db_dummy)
136 site_init()
137
showardcca334f2009-03-12 20:38:34 +0000138 # Change the cwd while running to avoid issues incase we were launched from
139 # somewhere odd (such as a random NFS home directory of the person running
140 # sudo to launch us as the appropriate user).
141 os.chdir(RESULTS_DIR)
142
jamesrenc7d387e2010-08-10 21:48:30 +0000143 # This is helpful for debugging why stuff a scheduler launches is
144 # misbehaving.
145 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000146
jadmanski0afbb632008-06-06 21:10:57 +0000147 if options.test:
148 global _autoserv_path
149 _autoserv_path = 'autoserv_dummy'
150 global _testing_mode
151 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000152
jamesrenc44ae992010-02-19 00:12:54 +0000153 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000154 server.start()
155
jadmanski0afbb632008-06-06 21:10:57 +0000156 try:
jamesrenc44ae992010-02-19 00:12:54 +0000157 initialize()
showardc5afc462009-01-13 00:09:39 +0000158 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000159 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000160
Eric Lia82dc352011-02-23 13:15:52 -0800161 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000162 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000163 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000164 except:
showard170873e2009-01-07 00:22:26 +0000165 email_manager.manager.log_stacktrace(
166 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000167
showard170873e2009-01-07 00:22:26 +0000168 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000169 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000170 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000171 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000172
173
showard136e6dc2009-06-10 19:38:49 +0000174def setup_logging():
175 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
176 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
177 logging_manager.configure_logging(
178 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
179 logfile_name=log_name)
180
181
mbligh36768f02008-02-22 18:28:33 +0000182def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000183 global _shutdown
184 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000185 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000186
187
jamesrenc44ae992010-02-19 00:12:54 +0000188def initialize():
showardb18134f2009-03-20 20:52:18 +0000189 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
190 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000191
showard8de37132009-08-31 18:33:08 +0000192 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000193 logging.critical("monitor_db already running, aborting!")
194 sys.exit(1)
195 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000196
showardb1e51872008-10-07 11:08:18 +0000197 if _testing_mode:
198 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000199 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000200
jadmanski0afbb632008-06-06 21:10:57 +0000201 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
202 global _db
showard170873e2009-01-07 00:22:26 +0000203 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000204 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000205
showardfa8629c2008-11-04 16:51:23 +0000206 # ensure Django connection is in autocommit
207 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000208 # bypass the readonly connection
209 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000210
showardb18134f2009-03-20 20:52:18 +0000211 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000212 signal.signal(signal.SIGINT, handle_sigint)
213
jamesrenc44ae992010-02-19 00:12:54 +0000214 initialize_globals()
215 scheduler_models.initialize()
216
showardd1ee1dd2009-01-07 21:33:08 +0000217 drones = global_config.global_config.get_config_value(
218 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
219 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000220 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000221 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000222 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
223
showardb18134f2009-03-20 20:52:18 +0000224 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000225
226
jamesrenc44ae992010-02-19 00:12:54 +0000227def initialize_globals():
228 global _drone_manager
229 _drone_manager = drone_manager.instance()
230
231
showarded2afea2009-07-07 20:54:07 +0000232def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
233 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000234 """
235 @returns The autoserv command line as a list of executable + parameters.
236
237 @param machines - string - A machine or comma separated list of machines
238 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000239 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700240 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
241 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000242 @param queue_entry - A HostQueueEntry object - If supplied and no Job
243 object was supplied, this will be used to lookup the Job object.
244 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700245 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
246 machines, results_directory=drone_manager.WORKING_DIRECTORY,
247 extra_args=extra_args, job=job, queue_entry=queue_entry,
248 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000249
250
Simran Basia858a232012-08-21 11:04:37 -0700251class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800252
253
jadmanski0afbb632008-06-06 21:10:57 +0000254 def __init__(self):
255 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000256 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800257 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000258 user_cleanup_time = scheduler_config.config.clean_interval
259 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
260 _db, user_cleanup_time)
261 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000262 self._host_agents = {}
263 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000264 self._tick_count = 0
265 self._last_garbage_stats_time = time.time()
266 self._seconds_between_garbage_stats = 60 * (
267 global_config.global_config.get_config_value(
268 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700269 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700270 self._tick_debug = global_config.global_config.get_config_value(
271 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
272 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700273 self._extra_debugging = global_config.global_config.get_config_value(
274 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
275 default=False)
mbligh36768f02008-02-22 18:28:33 +0000276
mbligh36768f02008-02-22 18:28:33 +0000277
showard915958d2009-04-22 21:00:58 +0000278 def initialize(self, recover_hosts=True):
279 self._periodic_cleanup.initialize()
280 self._24hr_upkeep.initialize()
281
jadmanski0afbb632008-06-06 21:10:57 +0000282 # always recover processes
283 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000284
jadmanski0afbb632008-06-06 21:10:57 +0000285 if recover_hosts:
286 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000287
jamesrenc44ae992010-02-19 00:12:54 +0000288 self._host_scheduler.recovery_on_startup()
289
mbligh36768f02008-02-22 18:28:33 +0000290
Simran Basi0ec94dd2012-08-28 09:50:10 -0700291 def _log_tick_msg(self, msg):
292 if self._tick_debug:
293 logging.debug(msg)
294
295
Simran Basidef92872012-09-20 13:34:34 -0700296 def _log_extra_msg(self, msg):
297 if self._extra_debugging:
298 logging.debug(msg)
299
300
jadmanski0afbb632008-06-06 21:10:57 +0000301 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700302 """
303 This is an altered version of tick() where we keep track of when each
304 major step begins so we can try to figure out where we are using most
305 of the tick time.
306 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700307 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700308 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000309 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700310 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000311 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700312 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000313 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700314 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000315 self._find_aborting()
beeps8bb1f7d2013-08-05 01:30:09 -0700316 self._log_tick_msg('Calling _find_aborted_special_tasks().')
317 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700318 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000319 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700320 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000321 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700322 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000323 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700324 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000325 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700326 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000327 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700328 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000329 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700330 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000331 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700332 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000333 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700334 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700335 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700336 with timer.get_client('email_manager_send_queued_emails'):
337 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700338 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700339 with timer.get_client('django_db_reset_queries'):
340 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000341 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000342
showard97aed502008-11-04 02:01:24 +0000343
mblighf3294cc2009-04-08 21:17:38 +0000344 def _run_cleanup(self):
345 self._periodic_cleanup.run_cleanup_maybe()
346 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000347
mbligh36768f02008-02-22 18:28:33 +0000348
showardf13a9e22009-12-18 22:54:09 +0000349 def _garbage_collection(self):
350 threshold_time = time.time() - self._seconds_between_garbage_stats
351 if threshold_time < self._last_garbage_stats_time:
352 # Don't generate these reports very often.
353 return
354
355 self._last_garbage_stats_time = time.time()
356 # Force a full level 0 collection (because we can, it doesn't hurt
357 # at this interval).
358 gc.collect()
359 logging.info('Logging garbage collector stats on tick %d.',
360 self._tick_count)
361 gc_stats._log_garbage_collector_stats()
362
363
showard170873e2009-01-07 00:22:26 +0000364 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
365 for object_id in object_ids:
366 agent_dict.setdefault(object_id, set()).add(agent)
367
368
369 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
370 for object_id in object_ids:
371 assert object_id in agent_dict
372 agent_dict[object_id].remove(agent)
373
374
showardd1195652009-12-08 22:21:02 +0000375 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700376 """
377 Creates and adds an agent to the dispatchers list.
378
379 In creating the agent we also pass on all the queue_entry_ids and
380 host_ids from the special agent task. For every agent we create, we
381 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
382 against the host_ids given to it. So theoritically, a host can have any
383 number of agents associated with it, and each of them can have any
384 special agent task, though in practice we never see > 1 agent/task per
385 host at any time.
386
387 @param agent_task: A SpecialTask for the agent to manage.
388 """
showardd1195652009-12-08 22:21:02 +0000389 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000390 self._agents.append(agent)
391 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000392 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
393 self._register_agent_for_ids(self._queue_entry_agents,
394 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000395
showard170873e2009-01-07 00:22:26 +0000396
397 def get_agents_for_entry(self, queue_entry):
398 """
399 Find agents corresponding to the specified queue_entry.
400 """
showardd3dc1992009-04-22 21:01:40 +0000401 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000402
403
404 def host_has_agent(self, host):
405 """
406 Determine if there is currently an Agent present using this host.
407 """
408 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000409
410
jadmanski0afbb632008-06-06 21:10:57 +0000411 def remove_agent(self, agent):
412 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000413 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
414 agent)
415 self._unregister_agent_for_ids(self._queue_entry_agents,
416 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000417
418
showard8cc058f2009-09-08 16:26:33 +0000419 def _host_has_scheduled_special_task(self, host):
420 return bool(models.SpecialTask.objects.filter(host__id=host.id,
421 is_active=False,
422 is_complete=False))
423
424
jadmanski0afbb632008-06-06 21:10:57 +0000425 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000426 agent_tasks = self._create_recovery_agent_tasks()
427 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000428 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000429 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000430 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000431 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000432 self._reverify_remaining_hosts()
433 # reinitialize drones after killing orphaned processes, since they can
434 # leave around files when they die
435 _drone_manager.execute_actions()
436 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000437
showard170873e2009-01-07 00:22:26 +0000438
showardd1195652009-12-08 22:21:02 +0000439 def _create_recovery_agent_tasks(self):
440 return (self._get_queue_entry_agent_tasks()
441 + self._get_special_task_agent_tasks(is_active=True))
442
443
444 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700445 """
446 Get agent tasks for all hqe in the specified states.
447
448 Loosely this translates to taking a hqe in one of the specified states,
449 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
450 through _get_agent_task_for_queue_entry. Each queue entry can only have
451 one agent task at a time, but there might be multiple queue entries in
452 the group.
453
454 @return: A list of AgentTasks.
455 """
showardd1195652009-12-08 22:21:02 +0000456 # host queue entry statuses handled directly by AgentTasks (Verifying is
457 # handled through SpecialTasks, so is not listed here)
458 statuses = (models.HostQueueEntry.Status.STARTING,
459 models.HostQueueEntry.Status.RUNNING,
460 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000461 models.HostQueueEntry.Status.PARSING,
462 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000463 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000464 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000465 where='status IN (%s)' % status_list)
466
467 agent_tasks = []
468 used_queue_entries = set()
469 for entry in queue_entries:
470 if self.get_agents_for_entry(entry):
471 # already being handled
472 continue
473 if entry in used_queue_entries:
474 # already picked up by a synchronous job
475 continue
476 agent_task = self._get_agent_task_for_queue_entry(entry)
477 agent_tasks.append(agent_task)
478 used_queue_entries.update(agent_task.queue_entries)
479 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000480
481
showardd1195652009-12-08 22:21:02 +0000482 def _get_special_task_agent_tasks(self, is_active=False):
483 special_tasks = models.SpecialTask.objects.filter(
484 is_active=is_active, is_complete=False)
485 return [self._get_agent_task_for_special_task(task)
486 for task in special_tasks]
487
488
489 def _get_agent_task_for_queue_entry(self, queue_entry):
490 """
beeps8bb1f7d2013-08-05 01:30:09 -0700491 Construct an AgentTask instance for the given active HostQueueEntry.
492
showardd1195652009-12-08 22:21:02 +0000493 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700494 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000495 """
496 task_entries = queue_entry.job.get_group_entries(queue_entry)
497 self._check_for_duplicate_host_entries(task_entries)
498
499 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
500 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000501 if queue_entry.is_hostless():
502 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000503 return QueueTask(queue_entries=task_entries)
504 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
505 return GatherLogsTask(queue_entries=task_entries)
506 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
507 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000508 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
509 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000510
Dale Curtisaa513362011-03-01 17:27:44 -0800511 raise host_scheduler.SchedulerError(
512 '_get_agent_task_for_queue_entry got entry with '
513 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000514
515
516 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000517 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
518 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000519 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000520 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000521 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000522 if using_host:
showardd1195652009-12-08 22:21:02 +0000523 self._assert_host_has_no_agent(task_entry)
524
525
526 def _assert_host_has_no_agent(self, entry):
527 """
528 @param entry: a HostQueueEntry or a SpecialTask
529 """
530 if self.host_has_agent(entry.host):
531 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800532 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000533 'While scheduling %s, host %s already has a host agent %s'
534 % (entry, entry.host, agent.task))
535
536
537 def _get_agent_task_for_special_task(self, special_task):
538 """
539 Construct an AgentTask class to run the given SpecialTask and add it
540 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700541
542 A special task is create through schedule_special_tasks, but only if
543 the host doesn't already have an agent. This happens through
544 add_agent_task. All special agent tasks are given a host on creation,
545 and a Null hqe. To create a SpecialAgentTask object, you need a
546 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
547 object contains a hqe it's passed on to the special agent task, which
548 creates a HostQueueEntry and saves it as it's queue_entry.
549
showardd1195652009-12-08 22:21:02 +0000550 @param special_task: a models.SpecialTask instance
551 @returns an AgentTask to run this SpecialTask
552 """
553 self._assert_host_has_no_agent(special_task)
554
Dan Shi07e09af2013-04-12 09:31:29 -0700555 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask,
Alex Millerdfff2fd2013-05-28 13:05:06 -0700556 ResetTask, ProvisionTask)
showardd1195652009-12-08 22:21:02 +0000557 for agent_task_class in special_agent_task_classes:
558 if agent_task_class.TASK_TYPE == special_task.task:
559 return agent_task_class(task=special_task)
560
Dale Curtisaa513362011-03-01 17:27:44 -0800561 raise host_scheduler.SchedulerError(
562 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000563
564
565 def _register_pidfiles(self, agent_tasks):
566 for agent_task in agent_tasks:
567 agent_task.register_necessary_pidfiles()
568
569
570 def _recover_tasks(self, agent_tasks):
571 orphans = _drone_manager.get_orphaned_autoserv_processes()
572
573 for agent_task in agent_tasks:
574 agent_task.recover()
575 if agent_task.monitor and agent_task.monitor.has_process():
576 orphans.discard(agent_task.monitor.get_process())
577 self.add_agent_task(agent_task)
578
579 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000580
581
showard8cc058f2009-09-08 16:26:33 +0000582 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000583 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
584 % status):
showard0db3d432009-10-12 20:29:15 +0000585 if entry.status == status and not self.get_agents_for_entry(entry):
586 # The status can change during iteration, e.g., if job.run()
587 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000588 yield entry
589
590
showard6878e8b2009-07-20 22:37:45 +0000591 def _check_for_remaining_orphan_processes(self, orphans):
592 if not orphans:
593 return
594 subject = 'Unrecovered orphan autoserv processes remain'
595 message = '\n'.join(str(process) for process in orphans)
596 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000597
598 die_on_orphans = global_config.global_config.get_config_value(
599 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
600
601 if die_on_orphans:
602 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000603
showard170873e2009-01-07 00:22:26 +0000604
showard8cc058f2009-09-08 16:26:33 +0000605 def _recover_pending_entries(self):
606 for entry in self._get_unassigned_entries(
607 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000608 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000609 entry.on_pending()
610
611
showardb8900452009-10-12 20:31:01 +0000612 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000613 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000614 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
615 unrecovered_hqes = []
616 for queue_entry in queue_entries:
617 special_tasks = models.SpecialTask.objects.filter(
618 task__in=(models.SpecialTask.Task.CLEANUP,
619 models.SpecialTask.Task.VERIFY),
620 queue_entry__id=queue_entry.id,
621 is_complete=False)
622 if special_tasks.count() == 0:
623 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000624
showardb8900452009-10-12 20:31:01 +0000625 if unrecovered_hqes:
626 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800627 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000628 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000629 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000630
631
showard65db3932009-10-28 19:54:35 +0000632 def _get_prioritized_special_tasks(self):
633 """
634 Returns all queued SpecialTasks prioritized for repair first, then
635 cleanup, then verify.
beeps8bb1f7d2013-08-05 01:30:09 -0700636
637 @return: list of afe.models.SpecialTasks sorted according to priority.
showard65db3932009-10-28 19:54:35 +0000638 """
639 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
640 is_complete=False,
641 host__locked=False)
642 # exclude hosts with active queue entries unless the SpecialTask is for
643 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000644 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000645 queued_tasks, 'afe_host_queue_entries', 'host_id',
646 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000647 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000648 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000649 where=['(afe_host_queue_entries.id IS NULL OR '
650 'afe_host_queue_entries.id = '
651 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000652
showard65db3932009-10-28 19:54:35 +0000653 # reorder tasks by priority
654 task_priority_order = [models.SpecialTask.Task.REPAIR,
655 models.SpecialTask.Task.CLEANUP,
Dan Shi07e09af2013-04-12 09:31:29 -0700656 models.SpecialTask.Task.VERIFY,
Alex Millerdfff2fd2013-05-28 13:05:06 -0700657 models.SpecialTask.Task.RESET,
658 models.SpecialTask.Task.PROVISION]
showard65db3932009-10-28 19:54:35 +0000659 def task_priority_key(task):
660 return task_priority_order.index(task.task)
661 return sorted(queued_tasks, key=task_priority_key)
662
663
showard65db3932009-10-28 19:54:35 +0000664 def _schedule_special_tasks(self):
665 """
666 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700667
668 Special tasks include PreJobTasks like verify, reset and cleanup.
669 They are created through _schedule_new_jobs and associated with a hqe
670 This method translates SpecialTasks to the appropriate AgentTask and
671 adds them to the dispatchers agents list, so _handle_agents can execute
672 them.
showard65db3932009-10-28 19:54:35 +0000673 """
674 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000675 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000676 continue
showardd1195652009-12-08 22:21:02 +0000677 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000678
679
showard170873e2009-01-07 00:22:26 +0000680 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000681 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000682 # should never happen
showarded2afea2009-07-07 20:54:07 +0000683 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000684 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000685 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700686 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000687 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000688
689
jadmanski0afbb632008-06-06 21:10:57 +0000690 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000691 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700692 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000693 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000694 if self.host_has_agent(host):
695 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000696 continue
showard8cc058f2009-09-08 16:26:33 +0000697 if self._host_has_scheduled_special_task(host):
698 # host will have a special task scheduled on the next cycle
699 continue
showard170873e2009-01-07 00:22:26 +0000700 if print_message:
showardb18134f2009-03-20 20:52:18 +0000701 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000702 models.SpecialTask.objects.create(
703 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000704 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000705
706
jadmanski0afbb632008-06-06 21:10:57 +0000707 def _recover_hosts(self):
708 # recover "Repair Failed" hosts
709 message = 'Reverifying dead host %s'
710 self._reverify_hosts_where("status = 'Repair Failed'",
711 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000712
713
showard04c82c52008-05-29 19:38:12 +0000714
showardb95b1bd2008-08-15 18:11:04 +0000715 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000716 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +0000717 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000718 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000719 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +0000720 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000721
722
showard89f84db2009-03-12 20:39:13 +0000723 def _refresh_pending_queue_entries(self):
724 """
725 Lookup the pending HostQueueEntries and call our HostScheduler
726 refresh() method given that list. Return the list.
727
728 @returns A list of pending HostQueueEntries sorted in priority order.
729 """
showard63a34772008-08-18 19:32:50 +0000730 queue_entries = self._get_pending_queue_entries()
731 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000732 return []
showardb95b1bd2008-08-15 18:11:04 +0000733
showard63a34772008-08-18 19:32:50 +0000734 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000735
showard89f84db2009-03-12 20:39:13 +0000736 return queue_entries
737
738
739 def _schedule_atomic_group(self, queue_entry):
740 """
741 Schedule the given queue_entry on an atomic group of hosts.
742
743 Returns immediately if there are insufficient available hosts.
744
745 Creates new HostQueueEntries based off of queue_entry for the
746 scheduled hosts and starts them all running.
747 """
748 # This is a virtual host queue entry representing an entire
749 # atomic group, find a group and schedule their hosts.
750 group_hosts = self._host_scheduler.find_eligible_atomic_group(
751 queue_entry)
752 if not group_hosts:
753 return
showardcbe6f942009-06-17 19:33:49 +0000754
755 logging.info('Expanding atomic group entry %s with hosts %s',
756 queue_entry,
757 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000758
showard89f84db2009-03-12 20:39:13 +0000759 for assigned_host in group_hosts[1:]:
760 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000761 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000762 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000763 new_hqe.set_host(assigned_host)
764 self._run_queue_entry(new_hqe)
765
766 # The first assigned host uses the original HostQueueEntry
767 queue_entry.set_host(group_hosts[0])
768 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000769
770
showarda9545c02009-12-18 22:44:26 +0000771 def _schedule_hostless_job(self, queue_entry):
772 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000773 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000774
775
showard89f84db2009-03-12 20:39:13 +0000776 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700777 """
778 Find any new HQEs and call schedule_pre_job_tasks for it.
779
780 This involves setting the status of the HQE and creating a row in the
781 db corresponding the the special task, through
782 scheduler_models._queue_special_task. The new db row is then added as
783 an agent to the dispatcher through _schedule_special_tasks and
784 scheduled for execution on the drone through _handle_agents.
785 """
showard89f84db2009-03-12 20:39:13 +0000786 queue_entries = self._refresh_pending_queue_entries()
787 if not queue_entries:
788 return
789
beepsb255fc52013-10-13 23:28:54 -0700790 new_hostless_jobs = 0
791 new_atomic_groups = 0
792 new_jobs_with_hosts = 0
793 new_jobs_need_hosts = 0
794
Simran Basi3f6717d2012-09-13 15:21:22 -0700795 logging.debug('Processing %d queue_entries', len(queue_entries))
showard63a34772008-08-18 19:32:50 +0000796 for queue_entry in queue_entries:
Simran Basidef92872012-09-20 13:34:34 -0700797 self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
showarde55955f2009-10-07 20:48:58 +0000798 is_unassigned_atomic_group = (
799 queue_entry.atomic_group_id is not None
800 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +0000801
802 if queue_entry.is_hostless():
Simran Basidef92872012-09-20 13:34:34 -0700803 self._log_extra_msg('Scheduling hostless job.')
showarda9545c02009-12-18 22:44:26 +0000804 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700805 new_hostless_jobs = new_hostless_jobs + 1
jamesren883492a2010-02-12 00:45:18 +0000806 elif is_unassigned_atomic_group:
807 self._schedule_atomic_group(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700808 new_atmoic_groups = new_atomic_groups + 1
showarde55955f2009-10-07 20:48:58 +0000809 else:
beepsb255fc52013-10-13 23:28:54 -0700810 new_jobs_need_hosts = new_jobs_need_hosts + 1
jamesren883492a2010-02-12 00:45:18 +0000811 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +0000812 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +0000813 assert assigned_host.id == queue_entry.host_id
814 self._run_queue_entry(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700815 new_jobs_with_hosts = new_jobs_with_hosts + 1
816
817 key = 'scheduler.jobs_per_tick'
818 stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
819 stats.Gauge(key).send('new_atomic_groups', new_atomic_groups)
820 stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
821 stats.Gauge(key).send('new_jobs_without_hosts',
822 new_jobs_need_hosts - new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000823
824
showard8cc058f2009-09-08 16:26:33 +0000825 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700826 """
827 Adds agents to the dispatcher.
828
829 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
830 QueueTask for example, will have a job with a control file, and
831 the agent will have methods that poll, abort and check if the queue
832 task is finished. The dispatcher runs the agent_task, as well as
833 other agents in it's _agents member, through _handle_agents, by
834 calling the Agents tick().
835
836 This method creates an agent for each HQE in one of (starting, running,
837 gathering, parsing, archiving) states, and adds it to the dispatcher so
838 it is handled by _handle_agents.
839 """
showardd1195652009-12-08 22:21:02 +0000840 for agent_task in self._get_queue_entry_agent_tasks():
841 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000842
843
844 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000845 for entry in scheduler_models.HostQueueEntry.fetch(
846 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000847 task = entry.job.schedule_delayed_callback_task(entry)
848 if task:
showardd1195652009-12-08 22:21:02 +0000849 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000850
851
jamesren883492a2010-02-12 00:45:18 +0000852 def _run_queue_entry(self, queue_entry):
Simran Basidef92872012-09-20 13:34:34 -0700853 self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
854 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000855 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000856
857
jadmanski0afbb632008-06-06 21:10:57 +0000858 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700859 """
860 Looks through the afe_host_queue_entries for an aborted entry.
861
862 The aborted bit is set on an HQE in many ways, the most common
863 being when a user requests an abort through the frontend, which
864 results in an rpc from the afe to abort_host_queue_entries.
865 """
jamesrene7c65cb2010-06-08 20:38:10 +0000866 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000867 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700868 where='aborted=1 and complete=0'):
showardf4a2e502009-07-28 20:06:39 +0000869 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +0000870 for agent in self.get_agents_for_entry(entry):
871 agent.abort()
872 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000873 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700874 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000875 for job in jobs_to_stop:
876 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000877
878
beeps8bb1f7d2013-08-05 01:30:09 -0700879 def _find_aborted_special_tasks(self):
880 """
881 Find SpecialTasks that have been marked for abortion.
882
883 Poll the database looking for SpecialTasks that are active
884 and have been marked for abortion, then abort them.
885 """
886
887 # The completed and active bits are very important when it comes
888 # to scheduler correctness. The active bit is set through the prolog
889 # of a special task, and reset through the cleanup method of the
890 # SpecialAgentTask. The cleanup is called both through the abort and
891 # epilog. The complete bit is set in several places, and in general
892 # a hanging job will have is_active=1 is_complete=0, while a special
893 # task which completed will have is_active=0 is_complete=1. To check
894 # aborts we directly check active because the complete bit is set in
895 # several places, including the epilog of agent tasks.
896 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
897 is_aborted=True)
898 for task in aborted_tasks:
899 # There are 2 ways to get the agent associated with a task,
900 # through the host and through the hqe. A special task
901 # always needs a host, but doesn't always need a hqe.
902 for agent in self._host_agents.get(task.host.id, []):
903 if isinstance(agent.task, SpecialAgentTask):
904
905 # The epilog preforms critical actions such as
906 # queueing the next SpecialTask, requeuing the
907 # hqe etc, however it doesn't actually kill the
908 # monitor process and set the 'done' bit. Epilogs
909 # assume that the job failed, and that the monitor
910 # process has already written an exit code. The
911 # done bit is a necessary condition for
912 # _handle_agents to schedule any more special
913 # tasks against the host, and it must be set
914 # in addition to is_active, is_complete and success.
915 agent.task.epilog()
916 agent.task.abort()
917
918
showard324bf812009-01-20 23:23:38 +0000919 def _can_start_agent(self, agent, num_started_this_cycle,
920 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000921 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000922 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000923 return True
924 # don't allow any nonzero-process agents to run after we've reached a
925 # limit (this avoids starvation of many-process agents)
926 if have_reached_limit:
927 return False
928 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000929 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000930 agent.task.owner_username,
931 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000932 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000933 return False
934 # if a single agent exceeds the per-cycle throttling, still allow it to
935 # run when it's the first agent in the cycle
936 if num_started_this_cycle == 0:
937 return True
938 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000939 if (num_started_this_cycle + agent.task.num_processes >
940 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000941 return False
942 return True
943
944
jadmanski0afbb632008-06-06 21:10:57 +0000945 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700946 """
947 Handles agents of the dispatcher.
948
949 Appropriate Agents are added to the dispatcher through
950 _schedule_running_host_queue_entries. These agents each
951 have a task. This method runs the agents task through
952 agent.tick() leading to:
953 agent.start
954 prolog -> AgentTasks prolog
955 For each queue entry:
956 sets host status/status to Running
957 set started_on in afe_host_queue_entries
958 run -> AgentTasks run
959 Creates PidfileRunMonitor
960 Queues the autoserv command line for this AgentTask
961 via the drone manager. These commands are executed
962 through the drone managers execute actions.
963 poll -> AgentTasks/BaseAgentTask poll
964 checks the monitors exit_code.
965 Executes epilog if task is finished.
966 Executes AgentTasks _finish_task
967 finish_task is usually responsible for setting the status
968 of the HQE/host, and updating it's active and complete fileds.
969
970 agent.is_done
971 Removed the agent from the dispatchers _agents queue.
972 Is_done checks the finished bit on the agent, that is
973 set based on the Agents task. During the agents poll
974 we check to see if the monitor process has exited in
975 it's finish method, and set the success member of the
976 task based on this exit code.
977 """
jadmanski0afbb632008-06-06 21:10:57 +0000978 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000979 have_reached_limit = False
980 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700981 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000982 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700983 self._log_extra_msg('Processing Agent with Host Ids: %s and '
984 'queue_entry ids:%s' % (agent.host_ids,
985 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000986 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000987 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000988 have_reached_limit):
989 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700990 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000991 continue
showardd1195652009-12-08 22:21:02 +0000992 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700993 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000994 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700995 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000996 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700997 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000998 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700999 logging.info('%d running processes. %d added this cycle.',
1000 _drone_manager.total_running_processes(),
1001 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +00001002
1003
showard29f7cd22009-04-29 21:16:24 +00001004 def _process_recurring_runs(self):
1005 recurring_runs = models.RecurringRun.objects.filter(
1006 start_date__lte=datetime.datetime.now())
1007 for rrun in recurring_runs:
1008 # Create job from template
1009 job = rrun.job
1010 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001011 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001012
1013 host_objects = info['hosts']
1014 one_time_hosts = info['one_time_hosts']
1015 metahost_objects = info['meta_hosts']
1016 dependencies = info['dependencies']
1017 atomic_group = info['atomic_group']
1018
1019 for host in one_time_hosts or []:
1020 this_host = models.Host.create_one_time_host(host.hostname)
1021 host_objects.append(this_host)
1022
1023 try:
1024 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001025 options=options,
showard29f7cd22009-04-29 21:16:24 +00001026 host_objects=host_objects,
1027 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001028 atomic_group=atomic_group)
1029
1030 except Exception, ex:
1031 logging.exception(ex)
1032 #TODO send email
1033
1034 if rrun.loop_count == 1:
1035 rrun.delete()
1036 else:
1037 if rrun.loop_count != 0: # if not infinite loop
1038 # calculate new start_date
1039 difference = datetime.timedelta(seconds=rrun.loop_period)
1040 rrun.start_date = rrun.start_date + difference
1041 rrun.loop_count -= 1
1042 rrun.save()
1043
1044
Simran Basia858a232012-08-21 11:04:37 -07001045SiteDispatcher = utils.import_site_class(
1046 __file__, 'autotest_lib.scheduler.site_monitor_db',
1047 'SiteDispatcher', BaseDispatcher)
1048
1049class Dispatcher(SiteDispatcher):
1050 pass
1051
1052
showard170873e2009-01-07 00:22:26 +00001053class PidfileRunMonitor(object):
1054 """
1055 Client must call either run() to start a new process or
1056 attach_to_existing_process().
1057 """
mbligh36768f02008-02-22 18:28:33 +00001058
showard170873e2009-01-07 00:22:26 +00001059 class _PidfileException(Exception):
1060 """
1061 Raised when there's some unexpected behavior with the pid file, but only
1062 used internally (never allowed to escape this class).
1063 """
mbligh36768f02008-02-22 18:28:33 +00001064
1065
showard170873e2009-01-07 00:22:26 +00001066 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001067 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001068 self._start_time = None
1069 self.pidfile_id = None
1070 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001071
1072
showard170873e2009-01-07 00:22:26 +00001073 def _add_nice_command(self, command, nice_level):
1074 if not nice_level:
1075 return command
1076 return ['nice', '-n', str(nice_level)] + command
1077
1078
1079 def _set_start_time(self):
1080 self._start_time = time.time()
1081
1082
showard418785b2009-11-23 20:19:59 +00001083 def run(self, command, working_directory, num_processes, nice_level=None,
1084 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +00001085 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +00001086 assert command is not None
1087 if nice_level is not None:
1088 command = ['nice', '-n', str(nice_level)] + command
1089 self._set_start_time()
1090 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001091 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001092 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +00001093 paired_with_pidfile=paired_with_pidfile, username=username,
1094 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +00001095
1096
showarded2afea2009-07-07 20:54:07 +00001097 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +00001098 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +00001099 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001100 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001101 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001102 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001103 if num_processes is not None:
1104 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001105
1106
jadmanski0afbb632008-06-06 21:10:57 +00001107 def kill(self):
showard170873e2009-01-07 00:22:26 +00001108 if self.has_process():
1109 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001110
mbligh36768f02008-02-22 18:28:33 +00001111
showard170873e2009-01-07 00:22:26 +00001112 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001113 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001114 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001115
1116
showard170873e2009-01-07 00:22:26 +00001117 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001118 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001119 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001120 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001121
1122
showard170873e2009-01-07 00:22:26 +00001123 def _read_pidfile(self, use_second_read=False):
1124 assert self.pidfile_id is not None, (
1125 'You must call run() or attach_to_existing_process()')
1126 contents = _drone_manager.get_pidfile_contents(
1127 self.pidfile_id, use_second_read=use_second_read)
1128 if contents.is_invalid():
1129 self._state = drone_manager.PidfileContents()
1130 raise self._PidfileException(contents)
1131 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001132
1133
showard21baa452008-10-21 00:08:39 +00001134 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001135 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1136 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001137 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001138 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001139
1140
1141 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001142 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001143 return
mblighbb421852008-03-11 22:36:16 +00001144
showard21baa452008-10-21 00:08:39 +00001145 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001146
showard170873e2009-01-07 00:22:26 +00001147 if self._state.process is None:
1148 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001149 return
mbligh90a549d2008-03-25 23:52:34 +00001150
showard21baa452008-10-21 00:08:39 +00001151 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001152 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001153 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001154 return
mbligh90a549d2008-03-25 23:52:34 +00001155
showard170873e2009-01-07 00:22:26 +00001156 # pid but no running process - maybe process *just* exited
1157 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001158 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001159 # autoserv exited without writing an exit code
1160 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001161 self._handle_pidfile_error(
1162 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001163
showard21baa452008-10-21 00:08:39 +00001164
1165 def _get_pidfile_info(self):
1166 """\
1167 After completion, self._state will contain:
1168 pid=None, exit_status=None if autoserv has not yet run
1169 pid!=None, exit_status=None if autoserv is running
1170 pid!=None, exit_status!=None if autoserv has completed
1171 """
1172 try:
1173 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001174 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001175 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001176
1177
showard170873e2009-01-07 00:22:26 +00001178 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001179 """\
1180 Called when no pidfile is found or no pid is in the pidfile.
1181 """
showard170873e2009-01-07 00:22:26 +00001182 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001183 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001184 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001185 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001186 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001187
1188
showard35162b02009-03-03 02:17:30 +00001189 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001190 """\
1191 Called when autoserv has exited without writing an exit status,
1192 or we've timed out waiting for autoserv to write a pid to the
1193 pidfile. In either case, we just return failure and the caller
1194 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001195
showard170873e2009-01-07 00:22:26 +00001196 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001197 """
1198 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001199 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001200 self._state.exit_status = 1
1201 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001202
1203
jadmanski0afbb632008-06-06 21:10:57 +00001204 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001205 self._get_pidfile_info()
1206 return self._state.exit_status
1207
1208
1209 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001210 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001211 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001212 if self._state.num_tests_failed is None:
1213 return -1
showard21baa452008-10-21 00:08:39 +00001214 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001215
1216
showardcdaeae82009-08-31 18:32:48 +00001217 def try_copy_results_on_drone(self, **kwargs):
1218 if self.has_process():
1219 # copy results logs into the normal place for job results
1220 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1221
1222
1223 def try_copy_to_results_repository(self, source, **kwargs):
1224 if self.has_process():
1225 _drone_manager.copy_to_results_repository(self.get_process(),
1226 source, **kwargs)
1227
1228
mbligh36768f02008-02-22 18:28:33 +00001229class Agent(object):
showard77182562009-06-10 00:16:05 +00001230 """
Alex Miller47715eb2013-07-24 03:34:01 -07001231 An agent for use by the Dispatcher class to perform a task. An agent wraps
1232 around an AgentTask mainly to associate the AgentTask with the queue_entry
1233 and host ids.
showard77182562009-06-10 00:16:05 +00001234
1235 The following methods are required on all task objects:
1236 poll() - Called periodically to let the task check its status and
1237 update its internal state. If the task succeeded.
1238 is_done() - Returns True if the task is finished.
1239 abort() - Called when an abort has been requested. The task must
1240 set its aborted attribute to True if it actually aborted.
1241
1242 The following attributes are required on all task objects:
1243 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001244 success - bool, True if this task succeeded.
1245 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1246 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001247 """
1248
1249
showard418785b2009-11-23 20:19:59 +00001250 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001251 """
Alex Miller47715eb2013-07-24 03:34:01 -07001252 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001253 """
showard8cc058f2009-09-08 16:26:33 +00001254 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001255
showard77182562009-06-10 00:16:05 +00001256 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001257 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001258
showard8cc058f2009-09-08 16:26:33 +00001259 self.queue_entry_ids = task.queue_entry_ids
1260 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001261
showard8cc058f2009-09-08 16:26:33 +00001262 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001263 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001264
1265
jadmanski0afbb632008-06-06 21:10:57 +00001266 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001267 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001268 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001269 self.task.poll()
1270 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001271 self.finished = True
showardec113162008-05-08 00:52:49 +00001272
1273
jadmanski0afbb632008-06-06 21:10:57 +00001274 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001275 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001276
1277
showardd3dc1992009-04-22 21:01:40 +00001278 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001279 if self.task:
1280 self.task.abort()
1281 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001282 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001283 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001284
showardd3dc1992009-04-22 21:01:40 +00001285
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001286class BaseAgentTask(object):
showardd1195652009-12-08 22:21:02 +00001287 class _NullMonitor(object):
1288 pidfile_id = None
1289
1290 def has_process(self):
1291 return True
1292
1293
1294 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001295 """
showardd1195652009-12-08 22:21:02 +00001296 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001297 """
jadmanski0afbb632008-06-06 21:10:57 +00001298 self.done = False
showardd1195652009-12-08 22:21:02 +00001299 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001300 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001301 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001302 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001303 self.queue_entry_ids = []
1304 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001305 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001306
1307
1308 def _set_ids(self, host=None, queue_entries=None):
1309 if queue_entries and queue_entries != [None]:
1310 self.host_ids = [entry.host.id for entry in queue_entries]
1311 self.queue_entry_ids = [entry.id for entry in queue_entries]
1312 else:
1313 assert host
1314 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001315
1316
jadmanski0afbb632008-06-06 21:10:57 +00001317 def poll(self):
showard08a36412009-05-05 01:01:13 +00001318 if not self.started:
1319 self.start()
showardd1195652009-12-08 22:21:02 +00001320 if not self.done:
1321 self.tick()
showard08a36412009-05-05 01:01:13 +00001322
1323
1324 def tick(self):
showardd1195652009-12-08 22:21:02 +00001325 assert self.monitor
1326 exit_code = self.monitor.exit_code()
1327 if exit_code is None:
1328 return
mbligh36768f02008-02-22 18:28:33 +00001329
showardd1195652009-12-08 22:21:02 +00001330 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001331 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001332
1333
jadmanski0afbb632008-06-06 21:10:57 +00001334 def is_done(self):
1335 return self.done
mbligh36768f02008-02-22 18:28:33 +00001336
1337
jadmanski0afbb632008-06-06 21:10:57 +00001338 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001339 if self.done:
showardd1195652009-12-08 22:21:02 +00001340 assert self.started
showard08a36412009-05-05 01:01:13 +00001341 return
showardd1195652009-12-08 22:21:02 +00001342 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001343 self.done = True
1344 self.success = success
1345 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001346
1347
jadmanski0afbb632008-06-06 21:10:57 +00001348 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001349 """
1350 To be overridden.
1351 """
showarded2afea2009-07-07 20:54:07 +00001352 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001353 self.register_necessary_pidfiles()
1354
1355
1356 def _log_file(self):
1357 if not self._log_file_name:
1358 return None
1359 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001360
mbligh36768f02008-02-22 18:28:33 +00001361
jadmanski0afbb632008-06-06 21:10:57 +00001362 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001363 log_file = self._log_file()
1364 if self.monitor and log_file:
1365 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001366
1367
jadmanski0afbb632008-06-06 21:10:57 +00001368 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001369 """
1370 To be overridden.
1371 """
jadmanski0afbb632008-06-06 21:10:57 +00001372 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001373 logging.info("%s finished with success=%s", type(self).__name__,
1374 self.success)
1375
mbligh36768f02008-02-22 18:28:33 +00001376
jadmanski0afbb632008-06-06 21:10:57 +00001377 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001378 if not self.started:
1379 self.prolog()
1380 self.run()
1381
1382 self.started = True
1383
1384
1385 def abort(self):
1386 if self.monitor:
1387 self.monitor.kill()
1388 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001389 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001390 self.cleanup()
1391
1392
showarded2afea2009-07-07 20:54:07 +00001393 def _get_consistent_execution_path(self, execution_entries):
1394 first_execution_path = execution_entries[0].execution_path()
1395 for execution_entry in execution_entries[1:]:
1396 assert execution_entry.execution_path() == first_execution_path, (
1397 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1398 execution_entry,
1399 first_execution_path,
1400 execution_entries[0]))
1401 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001402
1403
showarded2afea2009-07-07 20:54:07 +00001404 def _copy_results(self, execution_entries, use_monitor=None):
1405 """
1406 @param execution_entries: list of objects with execution_path() method
1407 """
showard6d1c1432009-08-20 23:30:39 +00001408 if use_monitor is not None and not use_monitor.has_process():
1409 return
1410
showarded2afea2009-07-07 20:54:07 +00001411 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001412 if use_monitor is None:
1413 assert self.monitor
1414 use_monitor = self.monitor
1415 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001416 execution_path = self._get_consistent_execution_path(execution_entries)
1417 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001418 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001419
showarda1e74b32009-05-12 17:32:04 +00001420
1421 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001422 for queue_entry in queue_entries:
1423 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001424
1425
mbligh4608b002010-01-05 18:22:35 +00001426 def _archive_results(self, queue_entries):
1427 for queue_entry in queue_entries:
1428 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001429
1430
showardd1195652009-12-08 22:21:02 +00001431 def _command_line(self):
1432 """
1433 Return the command line to run. Must be overridden.
1434 """
1435 raise NotImplementedError
1436
1437
1438 @property
1439 def num_processes(self):
1440 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001441 Return the number of processes forked by this BaseAgentTask's process.
1442 It may only be approximate. To be overridden if necessary.
showardd1195652009-12-08 22:21:02 +00001443 """
1444 return 1
1445
1446
1447 def _paired_with_monitor(self):
1448 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001449 If this BaseAgentTask's process must run on the same machine as some
showardd1195652009-12-08 22:21:02 +00001450 previous process, this method should be overridden to return a
1451 PidfileRunMonitor for that process.
1452 """
1453 return self._NullMonitor()
1454
1455
1456 @property
1457 def owner_username(self):
1458 """
1459 Return login of user responsible for this task. May be None. Must be
1460 overridden.
1461 """
1462 raise NotImplementedError
1463
1464
1465 def _working_directory(self):
1466 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001467 Return the directory where this BaseAgentTask's process executes.
1468 Must be overridden.
showardd1195652009-12-08 22:21:02 +00001469 """
1470 raise NotImplementedError
1471
1472
1473 def _pidfile_name(self):
1474 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001475 Return the name of the pidfile this BaseAgentTask's process uses. To be
showardd1195652009-12-08 22:21:02 +00001476 overridden if necessary.
1477 """
jamesrenc44ae992010-02-19 00:12:54 +00001478 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001479
1480
1481 def _check_paired_results_exist(self):
1482 if not self._paired_with_monitor().has_process():
1483 email_manager.manager.enqueue_notify_email(
1484 'No paired results in task',
1485 'No paired results in task %s at %s'
1486 % (self, self._paired_with_monitor().pidfile_id))
1487 self.finished(False)
1488 return False
1489 return True
1490
1491
1492 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001493 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001494 self.monitor = PidfileRunMonitor()
1495
1496
1497 def run(self):
1498 if not self._check_paired_results_exist():
1499 return
1500
1501 self._create_monitor()
1502 self.monitor.run(
1503 self._command_line(), self._working_directory(),
1504 num_processes=self.num_processes,
1505 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1506 pidfile_name=self._pidfile_name(),
1507 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001508 username=self.owner_username,
1509 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1510
1511
1512 def get_drone_hostnames_allowed(self):
1513 if not models.DroneSet.drone_sets_enabled():
1514 return None
1515
1516 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1517 if not hqes:
1518 # Only special tasks could be missing host queue entries
1519 assert isinstance(self, SpecialAgentTask)
1520 return self._user_or_global_default_drone_set(
1521 self.task, self.task.requested_by)
1522
1523 job_ids = hqes.values_list('job', flat=True).distinct()
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001524 assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
jamesren76fcf192010-04-21 20:39:50 +00001525 "span multiple jobs")
1526
1527 job = models.Job.objects.get(id=job_ids[0])
1528 drone_set = job.drone_set
1529 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001530 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001531
1532 return drone_set.get_drone_hostnames()
1533
1534
1535 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1536 """
1537 Returns the user's default drone set, if present.
1538
1539 Otherwise, returns the global default drone set.
1540 """
1541 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1542 if not user:
1543 logging.warn('%s had no owner; using default drone set',
1544 obj_with_owner)
1545 return default_hostnames
1546 if not user.drone_set:
1547 logging.warn('User %s has no default drone set, using global '
1548 'default', user.login)
1549 return default_hostnames
1550 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001551
1552
1553 def register_necessary_pidfiles(self):
1554 pidfile_id = _drone_manager.get_pidfile_id_from(
1555 self._working_directory(), self._pidfile_name())
1556 _drone_manager.register_pidfile(pidfile_id)
1557
1558 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1559 if paired_pidfile_id:
1560 _drone_manager.register_pidfile(paired_pidfile_id)
1561
1562
1563 def recover(self):
1564 if not self._check_paired_results_exist():
1565 return
1566
1567 self._create_monitor()
1568 self.monitor.attach_to_existing_process(
1569 self._working_directory(), pidfile_name=self._pidfile_name(),
1570 num_processes=self.num_processes)
1571 if not self.monitor.has_process():
1572 # no process to recover; wait to be started normally
1573 self.monitor = None
1574 return
1575
1576 self.started = True
Aviv Keshet225bdfe2013-03-05 10:10:08 -08001577 logging.info('Recovering process %s for %s at %s',
1578 self.monitor.get_process(), type(self).__name__,
1579 self._working_directory())
mbligh36768f02008-02-22 18:28:33 +00001580
1581
mbligh4608b002010-01-05 18:22:35 +00001582 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1583 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001584 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001585 for entry in queue_entries:
1586 if entry.status not in allowed_hqe_statuses:
Dale Curtisaa513362011-03-01 17:27:44 -08001587 raise host_scheduler.SchedulerError(
1588 '%s attempting to start entry with invalid status %s: '
1589 '%s' % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001590 invalid_host_status = (
1591 allowed_host_statuses is not None
1592 and entry.host.status not in allowed_host_statuses)
1593 if invalid_host_status:
Dale Curtisaa513362011-03-01 17:27:44 -08001594 raise host_scheduler.SchedulerError(
1595 '%s attempting to start on queue entry with invalid '
1596 'host status %s: %s'
1597 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001598
1599
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001600SiteAgentTask = utils.import_site_class(
1601 __file__, 'autotest_lib.scheduler.site_monitor_db',
1602 'SiteAgentTask', BaseAgentTask)
1603
1604class AgentTask(SiteAgentTask):
1605 pass
1606
1607
showardd9205182009-04-27 20:09:55 +00001608class TaskWithJobKeyvals(object):
1609 """AgentTask mixin providing functionality to help with job keyval files."""
1610 _KEYVAL_FILE = 'keyval'
1611 def _format_keyval(self, key, value):
1612 return '%s=%s' % (key, value)
1613
1614
1615 def _keyval_path(self):
1616 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001617 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001618
1619
1620 def _write_keyval_after_job(self, field, value):
1621 assert self.monitor
1622 if not self.monitor.has_process():
1623 return
1624 _drone_manager.write_lines_to_file(
1625 self._keyval_path(), [self._format_keyval(field, value)],
1626 paired_with_process=self.monitor.get_process())
1627
1628
1629 def _job_queued_keyval(self, job):
1630 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1631
1632
1633 def _write_job_finished(self):
1634 self._write_keyval_after_job("job_finished", int(time.time()))
1635
1636
showarddb502762009-09-09 15:31:20 +00001637 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1638 keyval_contents = '\n'.join(self._format_keyval(key, value)
1639 for key, value in keyval_dict.iteritems())
1640 # always end with a newline to allow additional keyvals to be written
1641 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001642 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001643 keyval_contents,
1644 file_path=keyval_path)
1645
1646
1647 def _write_keyvals_before_job(self, keyval_dict):
1648 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1649
1650
1651 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001652 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001653 host.hostname)
1654 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001655 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001656 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1657 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1658
1659
Alex Millerc23a7f02013-08-27 17:36:42 -07001660class SelfThrottledTask(AgentTask):
1661 """
1662 Special AgentTask subclass that maintains its own global process limit.
1663 """
1664 _num_running_processes = 0
Fang Deng7bcc0982013-09-05 15:26:57 -07001665 # Last known limit of max processes, used to check whether
1666 # max processes config has been changed.
1667 _last_known_max_processes = 0
1668 # Whether an email should be sent to notifiy process limit being hit.
1669 _notification_on = True
1670 # Once process limit is hit, an email will be sent.
1671 # To prevent spams, do not send another email until
1672 # it drops to lower than the following level.
1673 REVIVE_NOTIFICATION_THRESHOLD = 0.80
Alex Millerc23a7f02013-08-27 17:36:42 -07001674
1675
1676 @classmethod
1677 def _increment_running_processes(cls):
1678 cls._num_running_processes += 1
Dan Shid0e09ab2013-09-09 15:28:55 -07001679 stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__,
1680 cls._num_running_processes)
Alex Millerc23a7f02013-08-27 17:36:42 -07001681
1682
1683 @classmethod
1684 def _decrement_running_processes(cls):
1685 cls._num_running_processes -= 1
Dan Shid0e09ab2013-09-09 15:28:55 -07001686 stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__,
1687 cls._num_running_processes)
Alex Millerc23a7f02013-08-27 17:36:42 -07001688
1689
1690 @classmethod
1691 def _max_processes(cls):
1692 raise NotImplementedError
1693
1694
1695 @classmethod
1696 def _can_run_new_process(cls):
1697 return cls._num_running_processes < cls._max_processes()
1698
1699
1700 def _process_started(self):
1701 return bool(self.monitor)
1702
1703
1704 def tick(self):
1705 # override tick to keep trying to start until the process count goes
1706 # down and we can, at which point we revert to default behavior
1707 if self._process_started():
1708 super(SelfThrottledTask, self).tick()
1709 else:
1710 self._try_starting_process()
1711
1712
1713 def run(self):
1714 # override run() to not actually run unless we can
1715 self._try_starting_process()
1716
1717
Fang Deng7bcc0982013-09-05 15:26:57 -07001718 @classmethod
1719 def _notify_process_limit_hit(cls):
1720 """Send an email to notify that process limit is hit."""
1721 if cls._notification_on:
1722 subject = '%s: hitting max process limit.' % cls.__name__
1723 message = ('Running processes/Max processes: %d/%d'
1724 % (cls._num_running_processes, cls._max_processes()))
1725 email_manager.manager.enqueue_notify_email(
1726 subject, message)
1727 cls._notification_on = False
1728
1729
1730 @classmethod
1731 def _reset_notification_switch_if_necessary(cls):
1732 """Reset _notification_on if necessary.
1733
1734 Set _notification_on to True on the following cases:
1735 1) If the limit of max processes configuration changes;
1736 2) If _notification_on is False and the number of running processes
1737 drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD.
1738
1739 """
1740 if cls._last_known_max_processes != cls._max_processes():
1741 cls._notification_on = True
1742 cls._last_known_max_processes = cls._max_processes()
1743 return
1744 percentage = float(cls._num_running_processes) / cls._max_processes()
1745 if (not cls._notification_on and
1746 percentage < cls.REVIVE_NOTIFICATION_THRESHOLD):
1747 cls._notification_on = True
1748
Alex Millerc23a7f02013-08-27 17:36:42 -07001749 def _try_starting_process(self):
Fang Deng7bcc0982013-09-05 15:26:57 -07001750 self._reset_notification_switch_if_necessary()
Alex Millerc23a7f02013-08-27 17:36:42 -07001751 if not self._can_run_new_process():
Fang Deng7bcc0982013-09-05 15:26:57 -07001752 self._notify_process_limit_hit()
Alex Millerc23a7f02013-08-27 17:36:42 -07001753 return
1754
1755 # actually run the command
1756 super(SelfThrottledTask, self).run()
1757 if self._process_started():
1758 self._increment_running_processes()
1759
1760
1761 def finished(self, success):
1762 super(SelfThrottledTask, self).finished(success)
1763 if self._process_started():
1764 self._decrement_running_processes()
1765
1766
showard8cc058f2009-09-08 16:26:33 +00001767class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001768 """
1769 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1770 """
1771
1772 TASK_TYPE = None
1773 host = None
1774 queue_entry = None
1775
showardd1195652009-12-08 22:21:02 +00001776 def __init__(self, task, extra_command_args):
1777 super(SpecialAgentTask, self).__init__()
1778
lmrb7c5d272010-04-16 06:34:04 +00001779 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001780
jamesrenc44ae992010-02-19 00:12:54 +00001781 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001782 self.queue_entry = None
1783 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001784 self.queue_entry = scheduler_models.HostQueueEntry(
1785 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001786
showarded2afea2009-07-07 20:54:07 +00001787 self.task = task
1788 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001789
1790
showard8cc058f2009-09-08 16:26:33 +00001791 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001792 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1793
1794
1795 def _command_line(self):
1796 return _autoserv_command_line(self.host.hostname,
1797 self._extra_command_args,
1798 queue_entry=self.queue_entry)
1799
1800
1801 def _working_directory(self):
1802 return self.task.execution_path()
1803
1804
1805 @property
1806 def owner_username(self):
1807 if self.task.requested_by:
1808 return self.task.requested_by.login
1809 return None
showard8cc058f2009-09-08 16:26:33 +00001810
1811
showarded2afea2009-07-07 20:54:07 +00001812 def prolog(self):
1813 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001814 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001815 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001816
1817
showardde634ee2009-01-30 01:44:24 +00001818 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001819 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001820
showard2fe3f1d2009-07-06 20:19:11 +00001821 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001822 return # don't fail metahost entries, they'll be reassigned
1823
showard2fe3f1d2009-07-06 20:19:11 +00001824 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001825 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001826 return # entry has been aborted
1827
Alex Millerdfff2fd2013-05-28 13:05:06 -07001828 self._actually_fail_queue_entry()
1829
1830
1831 # TODO(milleral): http://crbug.com/268607
1832 # All this used to be a part of _fail_queue_entry. The
1833 # exact semantics of when one should and should not be failing a queue
1834 # entry need to be worked out, because provisioning has placed us in a
1835 # case where we want to fail a queue entry that could be requeued,
1836 # which makes us fail the two above if statements, and thus
1837 # _fail_queue_entry() would exit early and have no effect.
1838 # What's left here with _actually_fail_queue_entry is a hack to be able to
1839 # bypass the checks and unconditionally execute the code.
1840 def _actually_fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001841 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001842 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001843 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001844 self._write_keyval_after_job(queued_key, queued_time)
1845 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001846
showard8cc058f2009-09-08 16:26:33 +00001847 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001848 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001849 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001850 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001851
showard8cc058f2009-09-08 16:26:33 +00001852 pidfile_id = _drone_manager.get_pidfile_id_from(
1853 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001854 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001855 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001856
1857 if self.queue_entry.job.parse_failed_repair:
1858 self._parse_results([self.queue_entry])
1859 else:
1860 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001861
Alex Miller23676a22013-07-03 09:03:36 -07001862 # Also fail all other special tasks that have not yet run for this HQE
1863 pending_tasks = models.SpecialTask.objects.filter(
1864 queue_entry__id=self.queue_entry.id,
1865 is_complete=0)
Alex Miller5e36ccc2013-08-03 16:31:58 -07001866 for task in pending_tasks:
1867 task.finish(False)
Alex Miller23676a22013-07-03 09:03:36 -07001868
showard8cc058f2009-09-08 16:26:33 +00001869
1870 def cleanup(self):
1871 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001872
1873 # We will consider an aborted task to be "Failed"
1874 self.task.finish(bool(self.success))
1875
showardf85a0b72009-10-07 20:48:45 +00001876 if self.monitor:
1877 if self.monitor.has_process():
1878 self._copy_results([self.task])
1879 if self.monitor.pidfile_id is not None:
1880 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001881
1882
Dan Shi07e09af2013-04-12 09:31:29 -07001883 def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
1884 """Remove a type of special task in all tasks, keep last one if needed.
1885
1886 @param special_task_to_remove: type of special task to be removed, e.g.,
1887 models.SpecialTask.Task.VERIFY.
1888 @param keep_last_one: True to keep the last special task if its type is
1889 the same as of special_task_to_remove.
1890
1891 """
1892 queued_special_tasks = models.SpecialTask.objects.filter(
1893 host__id=self.host.id,
1894 task=special_task_to_remove,
1895 is_active=False, is_complete=False, queue_entry=None)
1896 if keep_last_one:
1897 queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
1898 queued_special_tasks.delete()
1899
1900
showard8cc058f2009-09-08 16:26:33 +00001901class RepairTask(SpecialAgentTask):
1902 TASK_TYPE = models.SpecialTask.Task.REPAIR
1903
1904
showardd1195652009-12-08 22:21:02 +00001905 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001906 """\
1907 queue_entry: queue entry to mark failed if this repair fails.
1908 """
1909 protection = host_protections.Protection.get_string(
1910 task.host.protection)
1911 # normalize the protection name
1912 protection = host_protections.Protection.get_attr_name(protection)
1913
1914 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001915 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001916
1917 # *don't* include the queue entry in IDs -- if the queue entry is
1918 # aborted, we want to leave the repair task running
1919 self._set_ids(host=self.host)
1920
1921
1922 def prolog(self):
1923 super(RepairTask, self).prolog()
1924 logging.info("repair_task starting")
1925 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001926
1927
jadmanski0afbb632008-06-06 21:10:57 +00001928 def epilog(self):
1929 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001930
jadmanski0afbb632008-06-06 21:10:57 +00001931 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001932 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001933 else:
showard8cc058f2009-09-08 16:26:33 +00001934 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001935 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001936 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001937
1938
showarded2afea2009-07-07 20:54:07 +00001939class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001940 def _copy_to_results_repository(self):
1941 if not self.queue_entry or self.queue_entry.meta_host:
1942 return
1943
1944 self.queue_entry.set_execution_subdir()
1945 log_name = os.path.basename(self.task.execution_path())
1946 source = os.path.join(self.task.execution_path(), 'debug',
1947 'autoserv.DEBUG')
1948 destination = os.path.join(
1949 self.queue_entry.execution_path(), log_name)
1950
1951 self.monitor.try_copy_to_results_repository(
1952 source, destination_path=destination)
1953
1954
showard170873e2009-01-07 00:22:26 +00001955 def epilog(self):
1956 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001957
showard775300b2009-09-09 15:30:50 +00001958 if self.success:
1959 return
showard8fe93b52008-11-18 17:53:22 +00001960
showard775300b2009-09-09 15:30:50 +00001961 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001962 # effectively ignore failure for these hosts
1963 self.success = True
showard775300b2009-09-09 15:30:50 +00001964 return
1965
1966 if self.queue_entry:
Alex Millerf3f19452013-07-29 15:53:00 -07001967 # If we requeue a HQE, we should cancel any remaining pre-job
1968 # tasks against this host, otherwise we'll be left in a state
1969 # where a queued HQE has special tasks to run against a host.
1970 models.SpecialTask.objects.filter(
1971 queue_entry__id=self.queue_entry.id,
1972 host__id=self.host.id,
1973 is_complete=0).update(is_complete=1, success=0)
showard775300b2009-09-09 15:30:50 +00001974
Alex Millera4a78ef2013-09-03 21:23:05 -07001975 previous_provisions = models.SpecialTask.objects.filter(
1976 task=models.SpecialTask.Task.PROVISION,
1977 queue_entry_id=self.queue_entry.id).count()
Alex Miller7bcec082013-09-19 10:00:53 -07001978 if (previous_provisions >
Alex Millera4a78ef2013-09-03 21:23:05 -07001979 scheduler_config.config.max_provision_retries):
1980 self._actually_fail_queue_entry()
1981 # This abort will mark the aborted bit on the HQE itself, to
1982 # signify that we're killing it. Technically it also will do
1983 # the recursive aborting of all child jobs, but that shouldn't
1984 # matter here, as only suites have children, and those are
1985 # hostless and thus don't have provisioning.
1986 # TODO(milleral) http://crbug.com/188217
1987 # However, we can't actually do this yet, as if we set the
1988 # abort bit the FinalReparseTask will set the status of the HQE
1989 # to ABORTED, which then means that we don't show the status in
1990 # run_suite. So in the meantime, don't mark the HQE as
1991 # aborted.
1992 # queue_entry.abort()
1993 else:
1994 # requeue() must come after handling provision retries, since
1995 # _actually_fail_queue_entry needs an execution subdir.
1996 # We also don't want to requeue if we hit the provision retry
1997 # limit, since then we overwrite the PARSING state of the HQE.
1998 self.queue_entry.requeue()
1999
2000 previous_repairs = models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00002001 task=models.SpecialTask.Task.REPAIR,
Alex Millera4a78ef2013-09-03 21:23:05 -07002002 queue_entry_id=self.queue_entry.id).count()
2003 if previous_repairs >= scheduler_config.config.max_repair_limit:
showard775300b2009-09-09 15:30:50 +00002004 self.host.set_status(models.Host.Status.REPAIR_FAILED)
2005 self._fail_queue_entry()
2006 return
2007
showard9bb960b2009-11-19 01:02:11 +00002008 queue_entry = models.HostQueueEntry.objects.get(
2009 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00002010 else:
2011 queue_entry = None
2012
2013 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00002014 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00002015 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00002016 queue_entry=queue_entry,
2017 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00002018
showard8fe93b52008-11-18 17:53:22 +00002019
Alex Miller42437f92013-05-28 12:58:54 -07002020 def _should_pending(self):
2021 """
2022 Decide if we should call the host queue entry's on_pending method.
2023 We should if:
2024 1) There exists an associated host queue entry.
2025 2) The current special task completed successfully.
2026 3) There do not exist any more special tasks to be run before the
2027 host queue entry starts.
2028
2029 @returns: True if we should call pending, false if not.
2030
2031 """
2032 if not self.queue_entry or not self.success:
2033 return False
2034
2035 # We know if this is the last one when we create it, so we could add
2036 # another column to the database to keep track of this information, but
2037 # I expect the overhead of querying here to be minimal.
2038 queue_entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
2039 queued = models.SpecialTask.objects.filter(
2040 host__id=self.host.id, is_active=False,
2041 is_complete=False, queue_entry=queue_entry)
2042 queued = queued.exclude(id=self.task.id)
2043 return queued.count() == 0
2044
2045
showard8fe93b52008-11-18 17:53:22 +00002046class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002047 TASK_TYPE = models.SpecialTask.Task.VERIFY
2048
2049
showardd1195652009-12-08 22:21:02 +00002050 def __init__(self, task):
2051 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00002052 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00002053
2054
jadmanski0afbb632008-06-06 21:10:57 +00002055 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002056 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00002057
showardb18134f2009-03-20 20:52:18 +00002058 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002059 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00002060 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2061 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00002062
jamesren42318f72010-05-10 23:40:59 +00002063 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00002064 # and there's no need to keep records of other requests.
Dan Shi07e09af2013-04-12 09:31:29 -07002065 self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
2066 keep_last_one=True)
showard2fe3f1d2009-07-06 20:19:11 +00002067
mbligh36768f02008-02-22 18:28:33 +00002068
jadmanski0afbb632008-06-06 21:10:57 +00002069 def epilog(self):
2070 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002071 if self.success:
Alex Miller42437f92013-05-28 12:58:54 -07002072 if self._should_pending():
showard8cc058f2009-09-08 16:26:33 +00002073 self.queue_entry.on_pending()
2074 else:
2075 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00002076
2077
mbligh4608b002010-01-05 18:22:35 +00002078class CleanupTask(PreJobTask):
2079 # note this can also run post-job, but when it does, it's running standalone
2080 # against the host (not related to the job), so it's not considered a
2081 # PostJobTask
2082
2083 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2084
2085
2086 def __init__(self, task, recover_run_monitor=None):
2087 super(CleanupTask, self).__init__(task, ['--cleanup'])
2088 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2089
2090
2091 def prolog(self):
2092 super(CleanupTask, self).prolog()
2093 logging.info("starting cleanup task for host: %s", self.host.hostname)
2094 self.host.set_status(models.Host.Status.CLEANING)
2095 if self.queue_entry:
Dan Shi07e09af2013-04-12 09:31:29 -07002096 self.queue_entry.set_status(models.HostQueueEntry.Status.CLEANING)
mbligh4608b002010-01-05 18:22:35 +00002097
2098
2099 def _finish_epilog(self):
2100 if not self.queue_entry or not self.success:
2101 return
2102
2103 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2104 should_run_verify = (
2105 self.queue_entry.job.run_verify
2106 and self.host.protection != do_not_verify_protection)
2107 if should_run_verify:
2108 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
2109 models.SpecialTask.objects.create(
2110 host=models.Host.objects.get(id=self.host.id),
2111 queue_entry=entry,
2112 task=models.SpecialTask.Task.VERIFY)
2113 else:
Alex Miller42437f92013-05-28 12:58:54 -07002114 if self._should_pending():
2115 self.queue_entry.on_pending()
mbligh4608b002010-01-05 18:22:35 +00002116
2117
2118 def epilog(self):
2119 super(CleanupTask, self).epilog()
2120
2121 if self.success:
2122 self.host.update_field('dirty', 0)
2123 self.host.set_status(models.Host.Status.READY)
2124
2125 self._finish_epilog()
2126
2127
Dan Shi07e09af2013-04-12 09:31:29 -07002128class ResetTask(PreJobTask):
2129 """Task to reset a DUT, including cleanup and verify."""
2130 # note this can also run post-job, but when it does, it's running standalone
2131 # against the host (not related to the job), so it's not considered a
2132 # PostJobTask
2133
2134 TASK_TYPE = models.SpecialTask.Task.RESET
2135
2136
2137 def __init__(self, task, recover_run_monitor=None):
2138 super(ResetTask, self).__init__(task, ['--reset'])
2139 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2140
2141
2142 def prolog(self):
2143 super(ResetTask, self).prolog()
2144 logging.info('starting reset task for host: %s',
2145 self.host.hostname)
2146 self.host.set_status(models.Host.Status.RESETTING)
2147 if self.queue_entry:
2148 self.queue_entry.set_status(models.HostQueueEntry.Status.RESETTING)
2149
2150 # Delete any queued cleanups for this host.
2151 self.remove_special_tasks(models.SpecialTask.Task.CLEANUP,
2152 keep_last_one=False)
2153
2154 # Delete any queued reverifies for this host.
2155 self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
2156 keep_last_one=False)
2157
2158 # Only one reset is needed.
2159 self.remove_special_tasks(models.SpecialTask.Task.RESET,
2160 keep_last_one=True)
2161
2162
2163 def epilog(self):
2164 super(ResetTask, self).epilog()
2165
2166 if self.success:
2167 self.host.update_field('dirty', 0)
Dan Shi07e09af2013-04-12 09:31:29 -07002168
Alex Millerba076c52013-07-11 10:11:48 -07002169 if self._should_pending():
Dan Shi07e09af2013-04-12 09:31:29 -07002170 self.queue_entry.on_pending()
Alex Millerdc608d52013-07-30 14:26:21 -07002171 else:
2172 self.host.set_status(models.Host.Status.READY)
Dan Shi07e09af2013-04-12 09:31:29 -07002173
2174
Alex Millerdfff2fd2013-05-28 13:05:06 -07002175class ProvisionTask(PreJobTask):
2176 TASK_TYPE = models.SpecialTask.Task.PROVISION
2177
2178 def __init__(self, task):
2179 # Provisioning requires that we be associated with a job/queue entry
2180 assert task.queue_entry, "No HQE associated with provision task!"
2181 # task.queue_entry is an afe model HostQueueEntry object.
2182 # self.queue_entry is a scheduler models HostQueueEntry object, but
2183 # it gets constructed and assigned in __init__, so it's not available
2184 # yet. Therefore, we're stuck pulling labels off of the afe model
2185 # so that we can pass the --provision args into the __init__ call.
2186 labels = {x.name for x in task.queue_entry.job.dependency_labels.all()}
2187 _, provisionable = provision.filter_labels(labels)
2188 extra_command_args = ['--provision', ','.join(provisionable)]
2189 super(ProvisionTask, self).__init__(task, extra_command_args)
2190 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2191
2192
2193 def _command_line(self):
2194 # If we give queue_entry to _autoserv_command_line, then it will append
2195 # -c for this invocation if the queue_entry is a client side test. We
2196 # don't want that, as it messes with provisioning, so we just drop it
2197 # from the arguments here.
2198 # Note that we also don't verify job_repo_url as provisioining tasks are
2199 # required to stage whatever content we need, and the job itself will
2200 # force autotest to be staged if it isn't already.
2201 return _autoserv_command_line(self.host.hostname,
2202 self._extra_command_args)
2203
2204
2205 def prolog(self):
2206 super(ProvisionTask, self).prolog()
2207 # add check for previous provision task and abort if exist.
2208 logging.info("starting provision task for host: %s", self.host.hostname)
2209 self.queue_entry.set_status(
2210 models.HostQueueEntry.Status.PROVISIONING)
2211 self.host.set_status(models.Host.Status.PROVISIONING)
2212
2213
2214 def epilog(self):
Alex Millera4a78ef2013-09-03 21:23:05 -07002215 super(ProvisionTask, self).epilog()
Alex Millerdfff2fd2013-05-28 13:05:06 -07002216
Alex Millera4a78ef2013-09-03 21:23:05 -07002217 if self._should_pending():
Alex Millerdfff2fd2013-05-28 13:05:06 -07002218 self.queue_entry.on_pending()
2219 else:
2220 self.host.set_status(models.Host.Status.READY)
2221
2222
showarda9545c02009-12-18 22:44:26 +00002223class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
2224 """
2225 Common functionality for QueueTask and HostlessQueueTask
2226 """
2227 def __init__(self, queue_entries):
2228 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00002229 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00002230 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00002231
2232
showard73ec0442009-02-07 02:05:20 +00002233 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00002234 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00002235
2236
jamesrenc44ae992010-02-19 00:12:54 +00002237 def _write_control_file(self, execution_path):
2238 control_path = _drone_manager.attach_file_to_execution(
2239 execution_path, self.job.control_file)
2240 return control_path
2241
2242
Aviv Keshet308e7362013-05-21 14:43:16 -07002243 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00002244 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00002245 execution_path = self.queue_entries[0].execution_path()
2246 control_path = self._write_control_file(execution_path)
2247 hostnames = ','.join(entry.host.hostname
2248 for entry in self.queue_entries
2249 if not entry.is_hostless())
2250
2251 execution_tag = self.queue_entries[0].execution_tag()
2252 params = _autoserv_command_line(
2253 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07002254 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00002255 _drone_manager.absolute_path(control_path)],
2256 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07002257 if self.job.is_image_update_job():
2258 params += ['--image', self.job.update_image_path]
2259
jamesrenc44ae992010-02-19 00:12:54 +00002260 return params
showardd1195652009-12-08 22:21:02 +00002261
2262
2263 @property
2264 def num_processes(self):
2265 return len(self.queue_entries)
2266
2267
2268 @property
2269 def owner_username(self):
2270 return self.job.owner
2271
2272
2273 def _working_directory(self):
2274 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002275
2276
jadmanski0afbb632008-06-06 21:10:57 +00002277 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002278 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00002279 keyval_dict = self.job.keyval_dict()
2280 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00002281 group_name = self.queue_entries[0].get_group_name()
2282 if group_name:
2283 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002284 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002285 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002286 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002287 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002288
2289
showard35162b02009-03-03 02:17:30 +00002290 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002291 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002292 _drone_manager.write_lines_to_file(error_file_path,
2293 [_LOST_PROCESS_ERROR])
2294
2295
showardd3dc1992009-04-22 21:01:40 +00002296 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002297 if not self.monitor:
2298 return
2299
showardd9205182009-04-27 20:09:55 +00002300 self._write_job_finished()
2301
showard35162b02009-03-03 02:17:30 +00002302 if self.monitor.lost_process:
2303 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002304
jadmanskif7fa2cc2008-10-01 14:13:23 +00002305
showardcbd74612008-11-19 21:42:02 +00002306 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002307 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002308 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002309 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002310 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002311
2312
jadmanskif7fa2cc2008-10-01 14:13:23 +00002313 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002314 if not self.monitor or not self.monitor.has_process():
2315 return
2316
jadmanskif7fa2cc2008-10-01 14:13:23 +00002317 # build up sets of all the aborted_by and aborted_on values
2318 aborted_by, aborted_on = set(), set()
2319 for queue_entry in self.queue_entries:
2320 if queue_entry.aborted_by:
2321 aborted_by.add(queue_entry.aborted_by)
2322 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2323 aborted_on.add(t)
2324
2325 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00002326 # TODO(showard): this conditional is now obsolete, we just need to leave
2327 # it in temporarily for backwards compatibility over upgrades. delete
2328 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00002329 assert len(aborted_by) <= 1
2330 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002331 aborted_by_value = aborted_by.pop()
2332 aborted_on_value = max(aborted_on)
2333 else:
2334 aborted_by_value = 'autotest_system'
2335 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002336
showarda0382352009-02-11 23:36:43 +00002337 self._write_keyval_after_job("aborted_by", aborted_by_value)
2338 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002339
showardcbd74612008-11-19 21:42:02 +00002340 aborted_on_string = str(datetime.datetime.fromtimestamp(
2341 aborted_on_value))
2342 self._write_status_comment('Job aborted by %s on %s' %
2343 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002344
2345
jadmanski0afbb632008-06-06 21:10:57 +00002346 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002347 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002348 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002349 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002350
2351
jadmanski0afbb632008-06-06 21:10:57 +00002352 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002353 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002354 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002355
2356
2357class QueueTask(AbstractQueueTask):
2358 def __init__(self, queue_entries):
2359 super(QueueTask, self).__init__(queue_entries)
2360 self._set_ids(queue_entries=queue_entries)
2361
2362
2363 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002364 self._check_queue_entry_statuses(
2365 self.queue_entries,
2366 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2367 models.HostQueueEntry.Status.RUNNING),
2368 allowed_host_statuses=(models.Host.Status.PENDING,
2369 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002370
2371 super(QueueTask, self).prolog()
2372
2373 for queue_entry in self.queue_entries:
2374 self._write_host_keyvals(queue_entry.host)
2375 queue_entry.host.set_status(models.Host.Status.RUNNING)
2376 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00002377
2378
2379 def _finish_task(self):
2380 super(QueueTask, self)._finish_task()
2381
2382 for queue_entry in self.queue_entries:
2383 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00002384 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00002385
2386
Alex Miller9f01d5d2013-08-08 02:26:01 -07002387 def _command_line(self):
2388 invocation = super(QueueTask, self)._command_line()
2389 return invocation + ['--verify_job_repo_url']
2390
2391
Alex Millerc23a7f02013-08-27 17:36:42 -07002392class HostlessQueueTask(SelfThrottledTask, AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00002393 def __init__(self, queue_entry):
2394 super(HostlessQueueTask, self).__init__([queue_entry])
2395 self.queue_entry_ids = [queue_entry.id]
2396
2397
2398 def prolog(self):
2399 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2400 super(HostlessQueueTask, self).prolog()
2401
2402
mbligh4608b002010-01-05 18:22:35 +00002403 def _finish_task(self):
2404 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002405 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002406
2407
Alex Millerc23a7f02013-08-27 17:36:42 -07002408 @classmethod
2409 def _max_processes(cls):
2410 return scheduler_config.config.max_hostless_processes
2411
2412
showardd3dc1992009-04-22 21:01:40 +00002413class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002414 def __init__(self, queue_entries, log_file_name):
2415 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002416
showardd1195652009-12-08 22:21:02 +00002417 self.queue_entries = queue_entries
2418
showardd3dc1992009-04-22 21:01:40 +00002419 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002420 self._autoserv_monitor.attach_to_existing_process(
2421 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002422
showardd1195652009-12-08 22:21:02 +00002423
2424 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002425 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002426 return 'true'
2427 return self._generate_command(
2428 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002429
2430
2431 def _generate_command(self, results_dir):
2432 raise NotImplementedError('Subclasses must override this')
2433
2434
showardd1195652009-12-08 22:21:02 +00002435 @property
2436 def owner_username(self):
2437 return self.queue_entries[0].job.owner
2438
2439
2440 def _working_directory(self):
2441 return self._get_consistent_execution_path(self.queue_entries)
2442
2443
2444 def _paired_with_monitor(self):
2445 return self._autoserv_monitor
2446
2447
showardd3dc1992009-04-22 21:01:40 +00002448 def _job_was_aborted(self):
2449 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002450 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002451 queue_entry.update_from_database()
2452 if was_aborted is None: # first queue entry
2453 was_aborted = bool(queue_entry.aborted)
2454 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00002455 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
2456 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00002457 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00002458 'Inconsistent abort state',
2459 'Queue entries have inconsistent abort state:\n' +
2460 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00002461 # don't crash here, just assume true
2462 return True
2463 return was_aborted
2464
2465
showardd1195652009-12-08 22:21:02 +00002466 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002467 if self._job_was_aborted():
2468 return models.HostQueueEntry.Status.ABORTED
2469
2470 # we'll use a PidfileRunMonitor to read the autoserv exit status
2471 if self._autoserv_monitor.exit_code() == 0:
2472 return models.HostQueueEntry.Status.COMPLETED
2473 return models.HostQueueEntry.Status.FAILED
2474
2475
showardd3dc1992009-04-22 21:01:40 +00002476 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002477 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002478 queue_entry.set_status(status)
2479
2480
2481 def abort(self):
2482 # override AgentTask.abort() to avoid killing the process and ending
2483 # the task. post-job tasks continue when the job is aborted.
2484 pass
2485
2486
mbligh4608b002010-01-05 18:22:35 +00002487 def _pidfile_label(self):
2488 # '.autoserv_execute' -> 'autoserv'
2489 return self._pidfile_name()[1:-len('_execute')]
2490
2491
showard9bb960b2009-11-19 01:02:11 +00002492class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002493 """
2494 Task responsible for
2495 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2496 * copying logs to the results repository
2497 * spawning CleanupTasks for hosts, if necessary
2498 * spawning a FinalReparseTask for the job
2499 """
showardd1195652009-12-08 22:21:02 +00002500 def __init__(self, queue_entries, recover_run_monitor=None):
2501 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002502 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002503 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002504 self._set_ids(queue_entries=queue_entries)
2505
2506
Aviv Keshet308e7362013-05-21 14:43:16 -07002507 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd3dc1992009-04-22 21:01:40 +00002508 def _generate_command(self, results_dir):
2509 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002510 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002511 return [_autoserv_path , '-p',
2512 '--pidfile-label=%s' % self._pidfile_label(),
2513 '--use-existing-results', '--collect-crashinfo',
2514 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002515
2516
showardd1195652009-12-08 22:21:02 +00002517 @property
2518 def num_processes(self):
2519 return len(self.queue_entries)
2520
2521
2522 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002523 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002524
2525
showardd3dc1992009-04-22 21:01:40 +00002526 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002527 self._check_queue_entry_statuses(
2528 self.queue_entries,
2529 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2530 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002531
showardd3dc1992009-04-22 21:01:40 +00002532 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002533
2534
showardd3dc1992009-04-22 21:01:40 +00002535 def epilog(self):
2536 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002537 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002538 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002539
showard9bb960b2009-11-19 01:02:11 +00002540
2541 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002542 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002543 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002544 models.HostQueueEntry.Status.COMPLETED)
2545 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2546 else:
2547 final_success = False
2548 num_tests_failed = 0
showard9bb960b2009-11-19 01:02:11 +00002549 reboot_after = self._job.reboot_after
2550 do_reboot = (
2551 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002552 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002553 or reboot_after == model_attributes.RebootAfter.ALWAYS
2554 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
Dan Shi07e09af2013-04-12 09:31:29 -07002555 and final_success and num_tests_failed == 0)
2556 or num_tests_failed > 0)
showard9bb960b2009-11-19 01:02:11 +00002557
showardd1195652009-12-08 22:21:02 +00002558 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002559 if do_reboot:
2560 # don't pass the queue entry to the CleanupTask. if the cleanup
2561 # fails, the job doesn't care -- it's over.
2562 models.SpecialTask.objects.create(
2563 host=models.Host.objects.get(id=queue_entry.host.id),
2564 task=models.SpecialTask.Task.CLEANUP,
2565 requested_by=self._job.owner_model())
2566 else:
2567 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002568
2569
showard0bbfc212009-04-29 21:06:13 +00002570 def run(self):
showard597bfd32009-05-08 18:22:50 +00002571 autoserv_exit_code = self._autoserv_monitor.exit_code()
2572 # only run if Autoserv exited due to some signal. if we have no exit
2573 # code, assume something bad (and signal-like) happened.
2574 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002575 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002576 else:
2577 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002578
2579
Alex Millerc23a7f02013-08-27 17:36:42 -07002580class FinalReparseTask(SelfThrottledTask, PostJobTask):
showardd1195652009-12-08 22:21:02 +00002581 def __init__(self, queue_entries):
2582 super(FinalReparseTask, self).__init__(queue_entries,
2583 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002584 # don't use _set_ids, since we don't want to set the host_ids
2585 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002586
2587
2588 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002589 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002590 results_dir]
2591
2592
2593 @property
2594 def num_processes(self):
2595 return 0 # don't include parser processes in accounting
2596
2597
2598 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002599 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002600
2601
showard97aed502008-11-04 02:01:24 +00002602 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002603 def _max_processes(cls):
2604 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002605
2606
2607 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002608 self._check_queue_entry_statuses(
2609 self.queue_entries,
2610 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002611
showard97aed502008-11-04 02:01:24 +00002612 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002613
2614
2615 def epilog(self):
2616 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002617 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002618
2619
Alex Millerc23a7f02013-08-27 17:36:42 -07002620class ArchiveResultsTask(SelfThrottledTask, PostJobTask):
showarde1575b52010-01-15 00:21:12 +00002621 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2622
mbligh4608b002010-01-05 18:22:35 +00002623 def __init__(self, queue_entries):
2624 super(ArchiveResultsTask, self).__init__(queue_entries,
2625 log_file_name='.archiving.log')
2626 # don't use _set_ids, since we don't want to set the host_ids
2627 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002628
2629
mbligh4608b002010-01-05 18:22:35 +00002630 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002631 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002632
2633
Aviv Keshet308e7362013-05-21 14:43:16 -07002634 # TODO: Refactor into autoserv_utils. crbug.com/243090
mbligh4608b002010-01-05 18:22:35 +00002635 def _generate_command(self, results_dir):
2636 return [_autoserv_path , '-p',
2637 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002638 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002639 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2640 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002641
2642
mbligh4608b002010-01-05 18:22:35 +00002643 @classmethod
2644 def _max_processes(cls):
2645 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002646
2647
2648 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002649 self._check_queue_entry_statuses(
2650 self.queue_entries,
2651 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2652
2653 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002654
2655
mbligh4608b002010-01-05 18:22:35 +00002656 def epilog(self):
2657 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002658 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002659 failed_file = os.path.join(self._working_directory(),
2660 self._ARCHIVING_FAILED_FILE)
2661 paired_process = self._paired_with_monitor().get_process()
2662 _drone_manager.write_lines_to_file(
2663 failed_file, ['Archiving failed with exit code %s'
2664 % self.monitor.exit_code()],
2665 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002666 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002667
2668
mbligh36768f02008-02-22 18:28:33 +00002669if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002670 main()