blob: 268bdc112a5604fda6437133a811f3fa4e0fc543 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
mbligh36768f02008-02-22 18:28:33 +00008
Aviv Keshet225bdfe2013-03-05 10:10:08 -08009import datetime, optparse, os, signal
10import sys, time, traceback, urllib
11import logging, gc
showard402934a2009-12-21 22:20:47 +000012
Alex Miller05d7b4c2013-03-04 07:49:38 -080013import common
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000015
16import django.db
17
showard136e6dc2009-06-10 19:38:49 +000018from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000019from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000020from autotest_lib.database import database_connection
jamesrendd855242010-03-02 22:23:44 +000021from autotest_lib.frontend.afe import model_attributes
Alex Miller05d7b4c2013-03-04 07:49:38 -080022from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000023from autotest_lib.scheduler import drone_manager, drones, email_manager
Dale Curtisaa513362011-03-01 17:27:44 -080024from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
Alex Miller05d7b4c2013-03-04 07:49:38 -080025from autotest_lib.scheduler import scheduler_logging_config
jamesrenc44ae992010-02-19 00:12:54 +000026from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080027from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070028from autotest_lib.server import autoserv_utils
Alex Millerdfff2fd2013-05-28 13:05:06 -070029from autotest_lib.server.cros import provision
Fang Deng1d6c2a02013-04-17 15:25:45 -070030from autotest_lib.site_utils.graphite import stats
Alex Miller05d7b4c2013-03-04 07:49:38 -080031
showard549afad2009-08-20 23:33:36 +000032BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
33PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000034
mbligh36768f02008-02-22 18:28:33 +000035RESULTS_DIR = '.'
36AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000037DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000038AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
39
40if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000041 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000042AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
43AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
44
45if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000046 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000047
showard35162b02009-03-03 02:17:30 +000048# error message to leave in results dir when an autoserv process disappears
49# mysteriously
50_LOST_PROCESS_ERROR = """\
51Autoserv failed abnormally during execution for this job, probably due to a
52system error on the Autotest server. Full results may not be available. Sorry.
53"""
54
mbligh6f8bab42008-02-29 22:45:14 +000055_db = None
mbligh36768f02008-02-22 18:28:33 +000056_shutdown = False
Aviv Keshet308e7362013-05-21 14:43:16 -070057_autoserv_directory = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server')
58_autoserv_path = os.path.join(_autoserv_directory, 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000059_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000060_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000061
Eric Lie0493a42010-11-15 13:05:43 -080062def _parser_path_default(install_dir):
63 return os.path.join(install_dir, 'tko', 'parse')
64_parser_path_func = utils.import_site_function(
65 __file__, 'autotest_lib.scheduler.site_monitor_db',
66 'parser_path', _parser_path_default)
67_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
68
mbligh36768f02008-02-22 18:28:33 +000069
showardec6a3b92009-09-25 20:29:13 +000070def _get_pidfile_timeout_secs():
71 """@returns How long to wait for autoserv to write pidfile."""
72 pidfile_timeout_mins = global_config.global_config.get_config_value(
73 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
74 return pidfile_timeout_mins * 60
75
76
mbligh83c1e9e2009-05-01 23:10:41 +000077def _site_init_monitor_db_dummy():
78 return {}
79
80
jamesren76fcf192010-04-21 20:39:50 +000081def _verify_default_drone_set_exists():
82 if (models.DroneSet.drone_sets_enabled() and
83 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080084 raise host_scheduler.SchedulerError(
85 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000086
87
88def _sanity_check():
89 """Make sure the configs are consistent before starting the scheduler"""
90 _verify_default_drone_set_exists()
91
92
mbligh36768f02008-02-22 18:28:33 +000093def main():
showard27f33872009-04-07 18:20:53 +000094 try:
showard549afad2009-08-20 23:33:36 +000095 try:
96 main_without_exception_handling()
97 except SystemExit:
98 raise
99 except:
100 logging.exception('Exception escaping in monitor_db')
101 raise
102 finally:
103 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000104
105
106def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000107 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000108
showard136e6dc2009-06-10 19:38:49 +0000109 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000110 parser = optparse.OptionParser(usage)
111 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
112 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000113 parser.add_option('--test', help='Indicate that scheduler is under ' +
114 'test and should use dummy autoserv and no parsing',
115 action='store_true')
116 (options, args) = parser.parse_args()
117 if len(args) != 1:
118 parser.print_usage()
119 return
mbligh36768f02008-02-22 18:28:33 +0000120
showard5613c662009-06-08 23:30:33 +0000121 scheduler_enabled = global_config.global_config.get_config_value(
122 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
123
124 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800125 logging.error("Scheduler not enabled, set enable_scheduler to true in "
126 "the global_config's SCHEDULER section to enable it. "
127 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000128 sys.exit(1)
129
jadmanski0afbb632008-06-06 21:10:57 +0000130 global RESULTS_DIR
131 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000132
mbligh83c1e9e2009-05-01 23:10:41 +0000133 site_init = utils.import_site_function(__file__,
134 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
135 _site_init_monitor_db_dummy)
136 site_init()
137
showardcca334f2009-03-12 20:38:34 +0000138 # Change the cwd while running to avoid issues incase we were launched from
139 # somewhere odd (such as a random NFS home directory of the person running
140 # sudo to launch us as the appropriate user).
141 os.chdir(RESULTS_DIR)
142
jamesrenc7d387e2010-08-10 21:48:30 +0000143 # This is helpful for debugging why stuff a scheduler launches is
144 # misbehaving.
145 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000146
jadmanski0afbb632008-06-06 21:10:57 +0000147 if options.test:
148 global _autoserv_path
149 _autoserv_path = 'autoserv_dummy'
150 global _testing_mode
151 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000152
jamesrenc44ae992010-02-19 00:12:54 +0000153 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000154 server.start()
155
jadmanski0afbb632008-06-06 21:10:57 +0000156 try:
jamesrenc44ae992010-02-19 00:12:54 +0000157 initialize()
showardc5afc462009-01-13 00:09:39 +0000158 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000159 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000160
Eric Lia82dc352011-02-23 13:15:52 -0800161 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000162 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000163 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000164 except:
showard170873e2009-01-07 00:22:26 +0000165 email_manager.manager.log_stacktrace(
166 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000167
showard170873e2009-01-07 00:22:26 +0000168 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000169 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000170 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000171 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000172
173
showard136e6dc2009-06-10 19:38:49 +0000174def setup_logging():
175 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
176 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
177 logging_manager.configure_logging(
178 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
179 logfile_name=log_name)
180
181
mbligh36768f02008-02-22 18:28:33 +0000182def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000183 global _shutdown
184 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000185 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000186
187
jamesrenc44ae992010-02-19 00:12:54 +0000188def initialize():
showardb18134f2009-03-20 20:52:18 +0000189 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
190 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000191
showard8de37132009-08-31 18:33:08 +0000192 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000193 logging.critical("monitor_db already running, aborting!")
194 sys.exit(1)
195 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000196
showardb1e51872008-10-07 11:08:18 +0000197 if _testing_mode:
198 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000199 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000200
jadmanski0afbb632008-06-06 21:10:57 +0000201 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
202 global _db
showard170873e2009-01-07 00:22:26 +0000203 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000204 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000205
showardfa8629c2008-11-04 16:51:23 +0000206 # ensure Django connection is in autocommit
207 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000208 # bypass the readonly connection
209 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000210
showardb18134f2009-03-20 20:52:18 +0000211 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000212 signal.signal(signal.SIGINT, handle_sigint)
213
jamesrenc44ae992010-02-19 00:12:54 +0000214 initialize_globals()
215 scheduler_models.initialize()
216
showardd1ee1dd2009-01-07 21:33:08 +0000217 drones = global_config.global_config.get_config_value(
218 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
219 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000220 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000221 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000222 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
223
showardb18134f2009-03-20 20:52:18 +0000224 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000225
226
jamesrenc44ae992010-02-19 00:12:54 +0000227def initialize_globals():
228 global _drone_manager
229 _drone_manager = drone_manager.instance()
230
231
showarded2afea2009-07-07 20:54:07 +0000232def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
233 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000234 """
235 @returns The autoserv command line as a list of executable + parameters.
236
237 @param machines - string - A machine or comma separated list of machines
238 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000239 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700240 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
241 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000242 @param queue_entry - A HostQueueEntry object - If supplied and no Job
243 object was supplied, this will be used to lookup the Job object.
244 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700245 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
246 machines, results_directory=drone_manager.WORKING_DIRECTORY,
247 extra_args=extra_args, job=job, queue_entry=queue_entry,
248 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000249
250
Simran Basia858a232012-08-21 11:04:37 -0700251class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800252
253
jadmanski0afbb632008-06-06 21:10:57 +0000254 def __init__(self):
255 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000256 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800257 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000258 user_cleanup_time = scheduler_config.config.clean_interval
259 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
260 _db, user_cleanup_time)
261 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000262 self._host_agents = {}
263 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000264 self._tick_count = 0
265 self._last_garbage_stats_time = time.time()
266 self._seconds_between_garbage_stats = 60 * (
267 global_config.global_config.get_config_value(
268 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700269 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700270 self._tick_debug = global_config.global_config.get_config_value(
271 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
272 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700273 self._extra_debugging = global_config.global_config.get_config_value(
274 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
275 default=False)
mbligh36768f02008-02-22 18:28:33 +0000276
mbligh36768f02008-02-22 18:28:33 +0000277
showard915958d2009-04-22 21:00:58 +0000278 def initialize(self, recover_hosts=True):
279 self._periodic_cleanup.initialize()
280 self._24hr_upkeep.initialize()
281
jadmanski0afbb632008-06-06 21:10:57 +0000282 # always recover processes
283 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000284
jadmanski0afbb632008-06-06 21:10:57 +0000285 if recover_hosts:
286 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000287
jamesrenc44ae992010-02-19 00:12:54 +0000288 self._host_scheduler.recovery_on_startup()
289
mbligh36768f02008-02-22 18:28:33 +0000290
Simran Basi0ec94dd2012-08-28 09:50:10 -0700291 def _log_tick_msg(self, msg):
292 if self._tick_debug:
293 logging.debug(msg)
294
295
Simran Basidef92872012-09-20 13:34:34 -0700296 def _log_extra_msg(self, msg):
297 if self._extra_debugging:
298 logging.debug(msg)
299
300
jadmanski0afbb632008-06-06 21:10:57 +0000301 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700302 """
303 This is an altered version of tick() where we keep track of when each
304 major step begins so we can try to figure out where we are using most
305 of the tick time.
306 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700307 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700308 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000309 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700310 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000311 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700312 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000313 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700314 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000315 self._find_aborting()
beeps8bb1f7d2013-08-05 01:30:09 -0700316 self._log_tick_msg('Calling _find_aborted_special_tasks().')
317 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700318 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000319 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700320 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000321 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700322 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000323 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700324 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000325 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700326 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000327 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700328 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000329 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700330 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000331 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700332 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000333 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700334 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700335 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700336 with timer.get_client('email_manager_send_queued_emails'):
337 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700338 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700339 with timer.get_client('django_db_reset_queries'):
340 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000341 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000342
showard97aed502008-11-04 02:01:24 +0000343
mblighf3294cc2009-04-08 21:17:38 +0000344 def _run_cleanup(self):
345 self._periodic_cleanup.run_cleanup_maybe()
346 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000347
mbligh36768f02008-02-22 18:28:33 +0000348
showardf13a9e22009-12-18 22:54:09 +0000349 def _garbage_collection(self):
350 threshold_time = time.time() - self._seconds_between_garbage_stats
351 if threshold_time < self._last_garbage_stats_time:
352 # Don't generate these reports very often.
353 return
354
355 self._last_garbage_stats_time = time.time()
356 # Force a full level 0 collection (because we can, it doesn't hurt
357 # at this interval).
358 gc.collect()
359 logging.info('Logging garbage collector stats on tick %d.',
360 self._tick_count)
361 gc_stats._log_garbage_collector_stats()
362
363
showard170873e2009-01-07 00:22:26 +0000364 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
365 for object_id in object_ids:
366 agent_dict.setdefault(object_id, set()).add(agent)
367
368
369 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
370 for object_id in object_ids:
371 assert object_id in agent_dict
372 agent_dict[object_id].remove(agent)
373
374
showardd1195652009-12-08 22:21:02 +0000375 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700376 """
377 Creates and adds an agent to the dispatchers list.
378
379 In creating the agent we also pass on all the queue_entry_ids and
380 host_ids from the special agent task. For every agent we create, we
381 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
382 against the host_ids given to it. So theoritically, a host can have any
383 number of agents associated with it, and each of them can have any
384 special agent task, though in practice we never see > 1 agent/task per
385 host at any time.
386
387 @param agent_task: A SpecialTask for the agent to manage.
388 """
showardd1195652009-12-08 22:21:02 +0000389 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000390 self._agents.append(agent)
391 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000392 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
393 self._register_agent_for_ids(self._queue_entry_agents,
394 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000395
showard170873e2009-01-07 00:22:26 +0000396
397 def get_agents_for_entry(self, queue_entry):
398 """
399 Find agents corresponding to the specified queue_entry.
400 """
showardd3dc1992009-04-22 21:01:40 +0000401 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000402
403
404 def host_has_agent(self, host):
405 """
406 Determine if there is currently an Agent present using this host.
407 """
408 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000409
410
jadmanski0afbb632008-06-06 21:10:57 +0000411 def remove_agent(self, agent):
412 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000413 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
414 agent)
415 self._unregister_agent_for_ids(self._queue_entry_agents,
416 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000417
418
showard8cc058f2009-09-08 16:26:33 +0000419 def _host_has_scheduled_special_task(self, host):
420 return bool(models.SpecialTask.objects.filter(host__id=host.id,
421 is_active=False,
422 is_complete=False))
423
424
jadmanski0afbb632008-06-06 21:10:57 +0000425 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000426 agent_tasks = self._create_recovery_agent_tasks()
427 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000428 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000429 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000430 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000431 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000432 self._reverify_remaining_hosts()
433 # reinitialize drones after killing orphaned processes, since they can
434 # leave around files when they die
435 _drone_manager.execute_actions()
436 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000437
showard170873e2009-01-07 00:22:26 +0000438
showardd1195652009-12-08 22:21:02 +0000439 def _create_recovery_agent_tasks(self):
440 return (self._get_queue_entry_agent_tasks()
441 + self._get_special_task_agent_tasks(is_active=True))
442
443
444 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700445 """
446 Get agent tasks for all hqe in the specified states.
447
448 Loosely this translates to taking a hqe in one of the specified states,
449 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
450 through _get_agent_task_for_queue_entry. Each queue entry can only have
451 one agent task at a time, but there might be multiple queue entries in
452 the group.
453
454 @return: A list of AgentTasks.
455 """
showardd1195652009-12-08 22:21:02 +0000456 # host queue entry statuses handled directly by AgentTasks (Verifying is
457 # handled through SpecialTasks, so is not listed here)
458 statuses = (models.HostQueueEntry.Status.STARTING,
459 models.HostQueueEntry.Status.RUNNING,
460 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000461 models.HostQueueEntry.Status.PARSING,
462 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000463 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000464 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000465 where='status IN (%s)' % status_list)
466
467 agent_tasks = []
468 used_queue_entries = set()
469 for entry in queue_entries:
470 if self.get_agents_for_entry(entry):
471 # already being handled
472 continue
473 if entry in used_queue_entries:
474 # already picked up by a synchronous job
475 continue
476 agent_task = self._get_agent_task_for_queue_entry(entry)
477 agent_tasks.append(agent_task)
478 used_queue_entries.update(agent_task.queue_entries)
479 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000480
481
showardd1195652009-12-08 22:21:02 +0000482 def _get_special_task_agent_tasks(self, is_active=False):
483 special_tasks = models.SpecialTask.objects.filter(
484 is_active=is_active, is_complete=False)
485 return [self._get_agent_task_for_special_task(task)
486 for task in special_tasks]
487
488
489 def _get_agent_task_for_queue_entry(self, queue_entry):
490 """
beeps8bb1f7d2013-08-05 01:30:09 -0700491 Construct an AgentTask instance for the given active HostQueueEntry.
492
showardd1195652009-12-08 22:21:02 +0000493 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700494 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000495 """
496 task_entries = queue_entry.job.get_group_entries(queue_entry)
497 self._check_for_duplicate_host_entries(task_entries)
498
499 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
500 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000501 if queue_entry.is_hostless():
502 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000503 return QueueTask(queue_entries=task_entries)
504 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
505 return GatherLogsTask(queue_entries=task_entries)
506 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
507 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000508 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
509 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000510
Dale Curtisaa513362011-03-01 17:27:44 -0800511 raise host_scheduler.SchedulerError(
512 '_get_agent_task_for_queue_entry got entry with '
513 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000514
515
516 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000517 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
518 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000519 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000520 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000521 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000522 if using_host:
showardd1195652009-12-08 22:21:02 +0000523 self._assert_host_has_no_agent(task_entry)
524
525
526 def _assert_host_has_no_agent(self, entry):
527 """
528 @param entry: a HostQueueEntry or a SpecialTask
529 """
530 if self.host_has_agent(entry.host):
531 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800532 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000533 'While scheduling %s, host %s already has a host agent %s'
534 % (entry, entry.host, agent.task))
535
536
537 def _get_agent_task_for_special_task(self, special_task):
538 """
539 Construct an AgentTask class to run the given SpecialTask and add it
540 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700541
542 A special task is create through schedule_special_tasks, but only if
543 the host doesn't already have an agent. This happens through
544 add_agent_task. All special agent tasks are given a host on creation,
545 and a Null hqe. To create a SpecialAgentTask object, you need a
546 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
547 object contains a hqe it's passed on to the special agent task, which
548 creates a HostQueueEntry and saves it as it's queue_entry.
549
showardd1195652009-12-08 22:21:02 +0000550 @param special_task: a models.SpecialTask instance
551 @returns an AgentTask to run this SpecialTask
552 """
553 self._assert_host_has_no_agent(special_task)
554
Dan Shi07e09af2013-04-12 09:31:29 -0700555 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask,
Alex Millerdfff2fd2013-05-28 13:05:06 -0700556 ResetTask, ProvisionTask)
showardd1195652009-12-08 22:21:02 +0000557 for agent_task_class in special_agent_task_classes:
558 if agent_task_class.TASK_TYPE == special_task.task:
559 return agent_task_class(task=special_task)
560
Dale Curtisaa513362011-03-01 17:27:44 -0800561 raise host_scheduler.SchedulerError(
562 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000563
564
565 def _register_pidfiles(self, agent_tasks):
566 for agent_task in agent_tasks:
567 agent_task.register_necessary_pidfiles()
568
569
570 def _recover_tasks(self, agent_tasks):
571 orphans = _drone_manager.get_orphaned_autoserv_processes()
572
573 for agent_task in agent_tasks:
574 agent_task.recover()
575 if agent_task.monitor and agent_task.monitor.has_process():
576 orphans.discard(agent_task.monitor.get_process())
577 self.add_agent_task(agent_task)
578
579 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000580
581
showard8cc058f2009-09-08 16:26:33 +0000582 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000583 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
584 % status):
showard0db3d432009-10-12 20:29:15 +0000585 if entry.status == status and not self.get_agents_for_entry(entry):
586 # The status can change during iteration, e.g., if job.run()
587 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000588 yield entry
589
590
showard6878e8b2009-07-20 22:37:45 +0000591 def _check_for_remaining_orphan_processes(self, orphans):
592 if not orphans:
593 return
594 subject = 'Unrecovered orphan autoserv processes remain'
595 message = '\n'.join(str(process) for process in orphans)
596 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000597
598 die_on_orphans = global_config.global_config.get_config_value(
599 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
600
601 if die_on_orphans:
602 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000603
showard170873e2009-01-07 00:22:26 +0000604
showard8cc058f2009-09-08 16:26:33 +0000605 def _recover_pending_entries(self):
606 for entry in self._get_unassigned_entries(
607 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000608 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000609 entry.on_pending()
610
611
showardb8900452009-10-12 20:31:01 +0000612 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000613 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000614 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
615 unrecovered_hqes = []
616 for queue_entry in queue_entries:
617 special_tasks = models.SpecialTask.objects.filter(
618 task__in=(models.SpecialTask.Task.CLEANUP,
619 models.SpecialTask.Task.VERIFY),
620 queue_entry__id=queue_entry.id,
621 is_complete=False)
622 if special_tasks.count() == 0:
623 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000624
showardb8900452009-10-12 20:31:01 +0000625 if unrecovered_hqes:
626 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800627 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000628 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000629 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000630
631
showard65db3932009-10-28 19:54:35 +0000632 def _get_prioritized_special_tasks(self):
633 """
634 Returns all queued SpecialTasks prioritized for repair first, then
635 cleanup, then verify.
beeps8bb1f7d2013-08-05 01:30:09 -0700636
637 @return: list of afe.models.SpecialTasks sorted according to priority.
showard65db3932009-10-28 19:54:35 +0000638 """
639 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
640 is_complete=False,
641 host__locked=False)
642 # exclude hosts with active queue entries unless the SpecialTask is for
643 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000644 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000645 queued_tasks, 'afe_host_queue_entries', 'host_id',
646 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000647 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000648 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000649 where=['(afe_host_queue_entries.id IS NULL OR '
650 'afe_host_queue_entries.id = '
651 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000652
showard65db3932009-10-28 19:54:35 +0000653 # reorder tasks by priority
654 task_priority_order = [models.SpecialTask.Task.REPAIR,
655 models.SpecialTask.Task.CLEANUP,
Dan Shi07e09af2013-04-12 09:31:29 -0700656 models.SpecialTask.Task.VERIFY,
Alex Millerdfff2fd2013-05-28 13:05:06 -0700657 models.SpecialTask.Task.RESET,
658 models.SpecialTask.Task.PROVISION]
showard65db3932009-10-28 19:54:35 +0000659 def task_priority_key(task):
660 return task_priority_order.index(task.task)
661 return sorted(queued_tasks, key=task_priority_key)
662
663
showard65db3932009-10-28 19:54:35 +0000664 def _schedule_special_tasks(self):
665 """
666 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700667
668 Special tasks include PreJobTasks like verify, reset and cleanup.
669 They are created through _schedule_new_jobs and associated with a hqe
670 This method translates SpecialTasks to the appropriate AgentTask and
671 adds them to the dispatchers agents list, so _handle_agents can execute
672 them.
showard65db3932009-10-28 19:54:35 +0000673 """
674 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000675 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000676 continue
showardd1195652009-12-08 22:21:02 +0000677 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000678
679
showard170873e2009-01-07 00:22:26 +0000680 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000681 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000682 # should never happen
showarded2afea2009-07-07 20:54:07 +0000683 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000684 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000685 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700686 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000687 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000688
689
jadmanski0afbb632008-06-06 21:10:57 +0000690 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000691 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700692 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000693 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000694 if self.host_has_agent(host):
695 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000696 continue
showard8cc058f2009-09-08 16:26:33 +0000697 if self._host_has_scheduled_special_task(host):
698 # host will have a special task scheduled on the next cycle
699 continue
showard170873e2009-01-07 00:22:26 +0000700 if print_message:
showardb18134f2009-03-20 20:52:18 +0000701 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000702 models.SpecialTask.objects.create(
703 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000704 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000705
706
jadmanski0afbb632008-06-06 21:10:57 +0000707 def _recover_hosts(self):
708 # recover "Repair Failed" hosts
709 message = 'Reverifying dead host %s'
710 self._reverify_hosts_where("status = 'Repair Failed'",
711 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000712
713
showard04c82c52008-05-29 19:38:12 +0000714
showardb95b1bd2008-08-15 18:11:04 +0000715 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000716 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +0000717 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000718 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000719 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +0000720 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000721
722
showard89f84db2009-03-12 20:39:13 +0000723 def _refresh_pending_queue_entries(self):
724 """
725 Lookup the pending HostQueueEntries and call our HostScheduler
726 refresh() method given that list. Return the list.
727
728 @returns A list of pending HostQueueEntries sorted in priority order.
729 """
showard63a34772008-08-18 19:32:50 +0000730 queue_entries = self._get_pending_queue_entries()
731 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000732 return []
showardb95b1bd2008-08-15 18:11:04 +0000733
showard63a34772008-08-18 19:32:50 +0000734 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000735
showard89f84db2009-03-12 20:39:13 +0000736 return queue_entries
737
738
739 def _schedule_atomic_group(self, queue_entry):
740 """
741 Schedule the given queue_entry on an atomic group of hosts.
742
743 Returns immediately if there are insufficient available hosts.
744
745 Creates new HostQueueEntries based off of queue_entry for the
746 scheduled hosts and starts them all running.
747 """
748 # This is a virtual host queue entry representing an entire
749 # atomic group, find a group and schedule their hosts.
750 group_hosts = self._host_scheduler.find_eligible_atomic_group(
751 queue_entry)
752 if not group_hosts:
753 return
showardcbe6f942009-06-17 19:33:49 +0000754
755 logging.info('Expanding atomic group entry %s with hosts %s',
756 queue_entry,
757 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000758
showard89f84db2009-03-12 20:39:13 +0000759 for assigned_host in group_hosts[1:]:
760 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000761 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000762 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000763 new_hqe.set_host(assigned_host)
764 self._run_queue_entry(new_hqe)
765
766 # The first assigned host uses the original HostQueueEntry
767 queue_entry.set_host(group_hosts[0])
768 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000769
770
showarda9545c02009-12-18 22:44:26 +0000771 def _schedule_hostless_job(self, queue_entry):
772 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000773 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000774
775
showard89f84db2009-03-12 20:39:13 +0000776 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700777 """
778 Find any new HQEs and call schedule_pre_job_tasks for it.
779
780 This involves setting the status of the HQE and creating a row in the
781 db corresponding the the special task, through
782 scheduler_models._queue_special_task. The new db row is then added as
783 an agent to the dispatcher through _schedule_special_tasks and
784 scheduled for execution on the drone through _handle_agents.
785 """
showard89f84db2009-03-12 20:39:13 +0000786 queue_entries = self._refresh_pending_queue_entries()
787 if not queue_entries:
788 return
789
beepsb255fc52013-10-13 23:28:54 -0700790 new_hostless_jobs = 0
791 new_atomic_groups = 0
792 new_jobs_with_hosts = 0
793 new_jobs_need_hosts = 0
794
Simran Basi3f6717d2012-09-13 15:21:22 -0700795 logging.debug('Processing %d queue_entries', len(queue_entries))
showard63a34772008-08-18 19:32:50 +0000796 for queue_entry in queue_entries:
Simran Basidef92872012-09-20 13:34:34 -0700797 self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
showarde55955f2009-10-07 20:48:58 +0000798 is_unassigned_atomic_group = (
799 queue_entry.atomic_group_id is not None
800 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +0000801
802 if queue_entry.is_hostless():
Simran Basidef92872012-09-20 13:34:34 -0700803 self._log_extra_msg('Scheduling hostless job.')
showarda9545c02009-12-18 22:44:26 +0000804 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700805 new_hostless_jobs = new_hostless_jobs + 1
jamesren883492a2010-02-12 00:45:18 +0000806 elif is_unassigned_atomic_group:
807 self._schedule_atomic_group(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700808 new_atmoic_groups = new_atomic_groups + 1
showarde55955f2009-10-07 20:48:58 +0000809 else:
beepsb255fc52013-10-13 23:28:54 -0700810 new_jobs_need_hosts = new_jobs_need_hosts + 1
jamesren883492a2010-02-12 00:45:18 +0000811 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +0000812 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +0000813 assert assigned_host.id == queue_entry.host_id
814 self._run_queue_entry(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700815 new_jobs_with_hosts = new_jobs_with_hosts + 1
816
817 key = 'scheduler.jobs_per_tick'
818 stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
819 stats.Gauge(key).send('new_atomic_groups', new_atomic_groups)
820 stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
821 stats.Gauge(key).send('new_jobs_without_hosts',
822 new_jobs_need_hosts - new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000823
824
showard8cc058f2009-09-08 16:26:33 +0000825 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700826 """
827 Adds agents to the dispatcher.
828
829 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
830 QueueTask for example, will have a job with a control file, and
831 the agent will have methods that poll, abort and check if the queue
832 task is finished. The dispatcher runs the agent_task, as well as
833 other agents in it's _agents member, through _handle_agents, by
834 calling the Agents tick().
835
836 This method creates an agent for each HQE in one of (starting, running,
837 gathering, parsing, archiving) states, and adds it to the dispatcher so
838 it is handled by _handle_agents.
839 """
showardd1195652009-12-08 22:21:02 +0000840 for agent_task in self._get_queue_entry_agent_tasks():
841 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000842
843
844 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000845 for entry in scheduler_models.HostQueueEntry.fetch(
846 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000847 task = entry.job.schedule_delayed_callback_task(entry)
848 if task:
showardd1195652009-12-08 22:21:02 +0000849 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000850
851
jamesren883492a2010-02-12 00:45:18 +0000852 def _run_queue_entry(self, queue_entry):
Simran Basidef92872012-09-20 13:34:34 -0700853 self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
854 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000855 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000856
857
jadmanski0afbb632008-06-06 21:10:57 +0000858 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700859 """
860 Looks through the afe_host_queue_entries for an aborted entry.
861
862 The aborted bit is set on an HQE in many ways, the most common
863 being when a user requests an abort through the frontend, which
864 results in an rpc from the afe to abort_host_queue_entries.
865 """
jamesrene7c65cb2010-06-08 20:38:10 +0000866 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000867 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700868 where='aborted=1 and complete=0'):
showardf4a2e502009-07-28 20:06:39 +0000869 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +0000870 for agent in self.get_agents_for_entry(entry):
871 agent.abort()
872 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000873 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700874 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000875 for job in jobs_to_stop:
876 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000877
878
beeps8bb1f7d2013-08-05 01:30:09 -0700879 def _find_aborted_special_tasks(self):
880 """
881 Find SpecialTasks that have been marked for abortion.
882
883 Poll the database looking for SpecialTasks that are active
884 and have been marked for abortion, then abort them.
885 """
886
887 # The completed and active bits are very important when it comes
888 # to scheduler correctness. The active bit is set through the prolog
889 # of a special task, and reset through the cleanup method of the
890 # SpecialAgentTask. The cleanup is called both through the abort and
891 # epilog. The complete bit is set in several places, and in general
892 # a hanging job will have is_active=1 is_complete=0, while a special
893 # task which completed will have is_active=0 is_complete=1. To check
894 # aborts we directly check active because the complete bit is set in
895 # several places, including the epilog of agent tasks.
896 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
897 is_aborted=True)
898 for task in aborted_tasks:
899 # There are 2 ways to get the agent associated with a task,
900 # through the host and through the hqe. A special task
901 # always needs a host, but doesn't always need a hqe.
902 for agent in self._host_agents.get(task.host.id, []):
903 if isinstance(agent.task, SpecialAgentTask):
904
905 # The epilog preforms critical actions such as
906 # queueing the next SpecialTask, requeuing the
907 # hqe etc, however it doesn't actually kill the
908 # monitor process and set the 'done' bit. Epilogs
909 # assume that the job failed, and that the monitor
910 # process has already written an exit code. The
911 # done bit is a necessary condition for
912 # _handle_agents to schedule any more special
913 # tasks against the host, and it must be set
914 # in addition to is_active, is_complete and success.
915 agent.task.epilog()
916 agent.task.abort()
917
918
showard324bf812009-01-20 23:23:38 +0000919 def _can_start_agent(self, agent, num_started_this_cycle,
920 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000921 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000922 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000923 return True
924 # don't allow any nonzero-process agents to run after we've reached a
925 # limit (this avoids starvation of many-process agents)
926 if have_reached_limit:
927 return False
928 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000929 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000930 agent.task.owner_username,
931 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000932 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000933 return False
934 # if a single agent exceeds the per-cycle throttling, still allow it to
935 # run when it's the first agent in the cycle
936 if num_started_this_cycle == 0:
937 return True
938 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000939 if (num_started_this_cycle + agent.task.num_processes >
940 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000941 return False
942 return True
943
944
jadmanski0afbb632008-06-06 21:10:57 +0000945 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700946 """
947 Handles agents of the dispatcher.
948
949 Appropriate Agents are added to the dispatcher through
950 _schedule_running_host_queue_entries. These agents each
951 have a task. This method runs the agents task through
952 agent.tick() leading to:
953 agent.start
954 prolog -> AgentTasks prolog
955 For each queue entry:
956 sets host status/status to Running
957 set started_on in afe_host_queue_entries
958 run -> AgentTasks run
959 Creates PidfileRunMonitor
960 Queues the autoserv command line for this AgentTask
961 via the drone manager. These commands are executed
962 through the drone managers execute actions.
963 poll -> AgentTasks/BaseAgentTask poll
964 checks the monitors exit_code.
965 Executes epilog if task is finished.
966 Executes AgentTasks _finish_task
967 finish_task is usually responsible for setting the status
968 of the HQE/host, and updating it's active and complete fileds.
969
970 agent.is_done
971 Removed the agent from the dispatchers _agents queue.
972 Is_done checks the finished bit on the agent, that is
973 set based on the Agents task. During the agents poll
974 we check to see if the monitor process has exited in
975 it's finish method, and set the success member of the
976 task based on this exit code.
977 """
jadmanski0afbb632008-06-06 21:10:57 +0000978 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000979 have_reached_limit = False
980 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700981 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000982 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700983 self._log_extra_msg('Processing Agent with Host Ids: %s and '
984 'queue_entry ids:%s' % (agent.host_ids,
985 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000986 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000987 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000988 have_reached_limit):
989 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700990 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000991 continue
showardd1195652009-12-08 22:21:02 +0000992 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700993 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000994 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700995 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000996 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700997 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000998 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700999 logging.info('%d running processes. %d added this cycle.',
1000 _drone_manager.total_running_processes(),
1001 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +00001002
1003
showard29f7cd22009-04-29 21:16:24 +00001004 def _process_recurring_runs(self):
1005 recurring_runs = models.RecurringRun.objects.filter(
1006 start_date__lte=datetime.datetime.now())
1007 for rrun in recurring_runs:
1008 # Create job from template
1009 job = rrun.job
1010 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001011 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001012
1013 host_objects = info['hosts']
1014 one_time_hosts = info['one_time_hosts']
1015 metahost_objects = info['meta_hosts']
1016 dependencies = info['dependencies']
1017 atomic_group = info['atomic_group']
1018
1019 for host in one_time_hosts or []:
1020 this_host = models.Host.create_one_time_host(host.hostname)
1021 host_objects.append(this_host)
1022
1023 try:
1024 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001025 options=options,
showard29f7cd22009-04-29 21:16:24 +00001026 host_objects=host_objects,
1027 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001028 atomic_group=atomic_group)
1029
1030 except Exception, ex:
1031 logging.exception(ex)
1032 #TODO send email
1033
1034 if rrun.loop_count == 1:
1035 rrun.delete()
1036 else:
1037 if rrun.loop_count != 0: # if not infinite loop
1038 # calculate new start_date
1039 difference = datetime.timedelta(seconds=rrun.loop_period)
1040 rrun.start_date = rrun.start_date + difference
1041 rrun.loop_count -= 1
1042 rrun.save()
1043
1044
Simran Basia858a232012-08-21 11:04:37 -07001045SiteDispatcher = utils.import_site_class(
1046 __file__, 'autotest_lib.scheduler.site_monitor_db',
1047 'SiteDispatcher', BaseDispatcher)
1048
1049class Dispatcher(SiteDispatcher):
1050 pass
1051
1052
showard170873e2009-01-07 00:22:26 +00001053class PidfileRunMonitor(object):
1054 """
1055 Client must call either run() to start a new process or
1056 attach_to_existing_process().
1057 """
mbligh36768f02008-02-22 18:28:33 +00001058
showard170873e2009-01-07 00:22:26 +00001059 class _PidfileException(Exception):
1060 """
1061 Raised when there's some unexpected behavior with the pid file, but only
1062 used internally (never allowed to escape this class).
1063 """
mbligh36768f02008-02-22 18:28:33 +00001064
1065
showard170873e2009-01-07 00:22:26 +00001066 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001067 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001068 self._start_time = None
1069 self.pidfile_id = None
1070 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001071
1072
showard170873e2009-01-07 00:22:26 +00001073 def _add_nice_command(self, command, nice_level):
1074 if not nice_level:
1075 return command
1076 return ['nice', '-n', str(nice_level)] + command
1077
1078
1079 def _set_start_time(self):
1080 self._start_time = time.time()
1081
1082
showard418785b2009-11-23 20:19:59 +00001083 def run(self, command, working_directory, num_processes, nice_level=None,
1084 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +00001085 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +00001086 assert command is not None
1087 if nice_level is not None:
1088 command = ['nice', '-n', str(nice_level)] + command
1089 self._set_start_time()
1090 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001091 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001092 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +00001093 paired_with_pidfile=paired_with_pidfile, username=username,
1094 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +00001095
1096
showarded2afea2009-07-07 20:54:07 +00001097 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +00001098 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +00001099 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001100 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001101 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001102 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001103 if num_processes is not None:
1104 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001105
1106
jadmanski0afbb632008-06-06 21:10:57 +00001107 def kill(self):
showard170873e2009-01-07 00:22:26 +00001108 if self.has_process():
1109 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001110
mbligh36768f02008-02-22 18:28:33 +00001111
showard170873e2009-01-07 00:22:26 +00001112 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001113 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001114 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001115
1116
showard170873e2009-01-07 00:22:26 +00001117 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001118 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001119 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001120 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001121
1122
showard170873e2009-01-07 00:22:26 +00001123 def _read_pidfile(self, use_second_read=False):
1124 assert self.pidfile_id is not None, (
1125 'You must call run() or attach_to_existing_process()')
1126 contents = _drone_manager.get_pidfile_contents(
1127 self.pidfile_id, use_second_read=use_second_read)
1128 if contents.is_invalid():
1129 self._state = drone_manager.PidfileContents()
1130 raise self._PidfileException(contents)
1131 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001132
1133
showard21baa452008-10-21 00:08:39 +00001134 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001135 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1136 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001137 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001138 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001139
1140
1141 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001142 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001143 return
mblighbb421852008-03-11 22:36:16 +00001144
showard21baa452008-10-21 00:08:39 +00001145 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001146
showard170873e2009-01-07 00:22:26 +00001147 if self._state.process is None:
1148 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001149 return
mbligh90a549d2008-03-25 23:52:34 +00001150
showard21baa452008-10-21 00:08:39 +00001151 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001152 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001153 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001154 return
mbligh90a549d2008-03-25 23:52:34 +00001155
showard170873e2009-01-07 00:22:26 +00001156 # pid but no running process - maybe process *just* exited
1157 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001158 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001159 # autoserv exited without writing an exit code
1160 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001161 self._handle_pidfile_error(
1162 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001163
showard21baa452008-10-21 00:08:39 +00001164
1165 def _get_pidfile_info(self):
1166 """\
1167 After completion, self._state will contain:
1168 pid=None, exit_status=None if autoserv has not yet run
1169 pid!=None, exit_status=None if autoserv is running
1170 pid!=None, exit_status!=None if autoserv has completed
1171 """
1172 try:
1173 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001174 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001175 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001176
1177
showard170873e2009-01-07 00:22:26 +00001178 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001179 """\
1180 Called when no pidfile is found or no pid is in the pidfile.
1181 """
showard170873e2009-01-07 00:22:26 +00001182 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001183 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001184 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001185 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001186 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001187
1188
showard35162b02009-03-03 02:17:30 +00001189 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001190 """\
1191 Called when autoserv has exited without writing an exit status,
1192 or we've timed out waiting for autoserv to write a pid to the
1193 pidfile. In either case, we just return failure and the caller
1194 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001195
showard170873e2009-01-07 00:22:26 +00001196 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001197 """
1198 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001199 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001200 self._state.exit_status = 1
1201 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001202
1203
jadmanski0afbb632008-06-06 21:10:57 +00001204 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001205 self._get_pidfile_info()
1206 return self._state.exit_status
1207
1208
1209 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001210 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001211 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001212 if self._state.num_tests_failed is None:
1213 return -1
showard21baa452008-10-21 00:08:39 +00001214 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001215
1216
showardcdaeae82009-08-31 18:32:48 +00001217 def try_copy_results_on_drone(self, **kwargs):
1218 if self.has_process():
1219 # copy results logs into the normal place for job results
1220 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1221
1222
1223 def try_copy_to_results_repository(self, source, **kwargs):
1224 if self.has_process():
1225 _drone_manager.copy_to_results_repository(self.get_process(),
1226 source, **kwargs)
1227
1228
mbligh36768f02008-02-22 18:28:33 +00001229class Agent(object):
showard77182562009-06-10 00:16:05 +00001230 """
Alex Miller47715eb2013-07-24 03:34:01 -07001231 An agent for use by the Dispatcher class to perform a task. An agent wraps
1232 around an AgentTask mainly to associate the AgentTask with the queue_entry
1233 and host ids.
showard77182562009-06-10 00:16:05 +00001234
1235 The following methods are required on all task objects:
1236 poll() - Called periodically to let the task check its status and
1237 update its internal state. If the task succeeded.
1238 is_done() - Returns True if the task is finished.
1239 abort() - Called when an abort has been requested. The task must
1240 set its aborted attribute to True if it actually aborted.
1241
1242 The following attributes are required on all task objects:
1243 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001244 success - bool, True if this task succeeded.
1245 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1246 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001247 """
1248
1249
showard418785b2009-11-23 20:19:59 +00001250 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001251 """
Alex Miller47715eb2013-07-24 03:34:01 -07001252 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001253 """
showard8cc058f2009-09-08 16:26:33 +00001254 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001255
showard77182562009-06-10 00:16:05 +00001256 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001257 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001258
showard8cc058f2009-09-08 16:26:33 +00001259 self.queue_entry_ids = task.queue_entry_ids
1260 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001261
showard8cc058f2009-09-08 16:26:33 +00001262 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001263 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001264
1265
jadmanski0afbb632008-06-06 21:10:57 +00001266 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001267 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001268 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001269 self.task.poll()
1270 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001271 self.finished = True
showardec113162008-05-08 00:52:49 +00001272
1273
jadmanski0afbb632008-06-06 21:10:57 +00001274 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001275 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001276
1277
showardd3dc1992009-04-22 21:01:40 +00001278 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001279 if self.task:
1280 self.task.abort()
1281 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001282 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001283 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001284
showardd3dc1992009-04-22 21:01:40 +00001285
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001286class BaseAgentTask(object):
showardd1195652009-12-08 22:21:02 +00001287 class _NullMonitor(object):
1288 pidfile_id = None
1289
1290 def has_process(self):
1291 return True
1292
1293
1294 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001295 """
showardd1195652009-12-08 22:21:02 +00001296 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001297 """
jadmanski0afbb632008-06-06 21:10:57 +00001298 self.done = False
showardd1195652009-12-08 22:21:02 +00001299 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001300 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001301 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001302 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001303 self.queue_entry_ids = []
1304 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001305 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001306
1307
1308 def _set_ids(self, host=None, queue_entries=None):
1309 if queue_entries and queue_entries != [None]:
1310 self.host_ids = [entry.host.id for entry in queue_entries]
1311 self.queue_entry_ids = [entry.id for entry in queue_entries]
1312 else:
1313 assert host
1314 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001315
1316
jadmanski0afbb632008-06-06 21:10:57 +00001317 def poll(self):
showard08a36412009-05-05 01:01:13 +00001318 if not self.started:
1319 self.start()
showardd1195652009-12-08 22:21:02 +00001320 if not self.done:
1321 self.tick()
showard08a36412009-05-05 01:01:13 +00001322
1323
1324 def tick(self):
showardd1195652009-12-08 22:21:02 +00001325 assert self.monitor
1326 exit_code = self.monitor.exit_code()
1327 if exit_code is None:
1328 return
mbligh36768f02008-02-22 18:28:33 +00001329
showardd1195652009-12-08 22:21:02 +00001330 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001331 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001332
1333
jadmanski0afbb632008-06-06 21:10:57 +00001334 def is_done(self):
1335 return self.done
mbligh36768f02008-02-22 18:28:33 +00001336
1337
jadmanski0afbb632008-06-06 21:10:57 +00001338 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001339 if self.done:
showardd1195652009-12-08 22:21:02 +00001340 assert self.started
showard08a36412009-05-05 01:01:13 +00001341 return
showardd1195652009-12-08 22:21:02 +00001342 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001343 self.done = True
1344 self.success = success
1345 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001346
1347
jadmanski0afbb632008-06-06 21:10:57 +00001348 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001349 """
1350 To be overridden.
1351 """
showarded2afea2009-07-07 20:54:07 +00001352 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001353 self.register_necessary_pidfiles()
1354
1355
1356 def _log_file(self):
1357 if not self._log_file_name:
1358 return None
1359 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001360
mbligh36768f02008-02-22 18:28:33 +00001361
jadmanski0afbb632008-06-06 21:10:57 +00001362 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001363 log_file = self._log_file()
1364 if self.monitor and log_file:
1365 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001366
1367
jadmanski0afbb632008-06-06 21:10:57 +00001368 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001369 """
1370 To be overridden.
1371 """
jadmanski0afbb632008-06-06 21:10:57 +00001372 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001373 logging.info("%s finished with success=%s", type(self).__name__,
1374 self.success)
1375
mbligh36768f02008-02-22 18:28:33 +00001376
jadmanski0afbb632008-06-06 21:10:57 +00001377 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001378 if not self.started:
1379 self.prolog()
1380 self.run()
1381
1382 self.started = True
1383
1384
1385 def abort(self):
1386 if self.monitor:
1387 self.monitor.kill()
1388 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001389 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001390 self.cleanup()
1391
1392
showarded2afea2009-07-07 20:54:07 +00001393 def _get_consistent_execution_path(self, execution_entries):
1394 first_execution_path = execution_entries[0].execution_path()
1395 for execution_entry in execution_entries[1:]:
1396 assert execution_entry.execution_path() == first_execution_path, (
1397 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1398 execution_entry,
1399 first_execution_path,
1400 execution_entries[0]))
1401 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001402
1403
showarded2afea2009-07-07 20:54:07 +00001404 def _copy_results(self, execution_entries, use_monitor=None):
1405 """
1406 @param execution_entries: list of objects with execution_path() method
1407 """
showard6d1c1432009-08-20 23:30:39 +00001408 if use_monitor is not None and not use_monitor.has_process():
1409 return
1410
showarded2afea2009-07-07 20:54:07 +00001411 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001412 if use_monitor is None:
1413 assert self.monitor
1414 use_monitor = self.monitor
1415 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001416 execution_path = self._get_consistent_execution_path(execution_entries)
1417 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001418 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001419
showarda1e74b32009-05-12 17:32:04 +00001420
1421 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001422 for queue_entry in queue_entries:
1423 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001424
1425
mbligh4608b002010-01-05 18:22:35 +00001426 def _archive_results(self, queue_entries):
1427 for queue_entry in queue_entries:
1428 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001429
1430
showardd1195652009-12-08 22:21:02 +00001431 def _command_line(self):
1432 """
1433 Return the command line to run. Must be overridden.
1434 """
1435 raise NotImplementedError
1436
1437
1438 @property
1439 def num_processes(self):
1440 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001441 Return the number of processes forked by this BaseAgentTask's process.
1442 It may only be approximate. To be overridden if necessary.
showardd1195652009-12-08 22:21:02 +00001443 """
1444 return 1
1445
1446
1447 def _paired_with_monitor(self):
1448 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001449 If this BaseAgentTask's process must run on the same machine as some
showardd1195652009-12-08 22:21:02 +00001450 previous process, this method should be overridden to return a
1451 PidfileRunMonitor for that process.
1452 """
1453 return self._NullMonitor()
1454
1455
1456 @property
1457 def owner_username(self):
1458 """
1459 Return login of user responsible for this task. May be None. Must be
1460 overridden.
1461 """
1462 raise NotImplementedError
1463
1464
1465 def _working_directory(self):
1466 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001467 Return the directory where this BaseAgentTask's process executes.
1468 Must be overridden.
showardd1195652009-12-08 22:21:02 +00001469 """
1470 raise NotImplementedError
1471
1472
1473 def _pidfile_name(self):
1474 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001475 Return the name of the pidfile this BaseAgentTask's process uses. To be
showardd1195652009-12-08 22:21:02 +00001476 overridden if necessary.
1477 """
jamesrenc44ae992010-02-19 00:12:54 +00001478 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001479
1480
1481 def _check_paired_results_exist(self):
1482 if not self._paired_with_monitor().has_process():
1483 email_manager.manager.enqueue_notify_email(
1484 'No paired results in task',
1485 'No paired results in task %s at %s'
1486 % (self, self._paired_with_monitor().pidfile_id))
1487 self.finished(False)
1488 return False
1489 return True
1490
1491
1492 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001493 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001494 self.monitor = PidfileRunMonitor()
1495
1496
1497 def run(self):
1498 if not self._check_paired_results_exist():
1499 return
1500
1501 self._create_monitor()
1502 self.monitor.run(
1503 self._command_line(), self._working_directory(),
1504 num_processes=self.num_processes,
1505 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1506 pidfile_name=self._pidfile_name(),
1507 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001508 username=self.owner_username,
1509 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1510
1511
1512 def get_drone_hostnames_allowed(self):
1513 if not models.DroneSet.drone_sets_enabled():
1514 return None
1515
1516 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1517 if not hqes:
1518 # Only special tasks could be missing host queue entries
1519 assert isinstance(self, SpecialAgentTask)
1520 return self._user_or_global_default_drone_set(
1521 self.task, self.task.requested_by)
1522
1523 job_ids = hqes.values_list('job', flat=True).distinct()
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001524 assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
jamesren76fcf192010-04-21 20:39:50 +00001525 "span multiple jobs")
1526
1527 job = models.Job.objects.get(id=job_ids[0])
1528 drone_set = job.drone_set
1529 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001530 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001531
1532 return drone_set.get_drone_hostnames()
1533
1534
1535 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1536 """
1537 Returns the user's default drone set, if present.
1538
1539 Otherwise, returns the global default drone set.
1540 """
1541 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1542 if not user:
1543 logging.warn('%s had no owner; using default drone set',
1544 obj_with_owner)
1545 return default_hostnames
1546 if not user.drone_set:
1547 logging.warn('User %s has no default drone set, using global '
1548 'default', user.login)
1549 return default_hostnames
1550 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001551
1552
1553 def register_necessary_pidfiles(self):
1554 pidfile_id = _drone_manager.get_pidfile_id_from(
1555 self._working_directory(), self._pidfile_name())
1556 _drone_manager.register_pidfile(pidfile_id)
1557
1558 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1559 if paired_pidfile_id:
1560 _drone_manager.register_pidfile(paired_pidfile_id)
1561
1562
1563 def recover(self):
1564 if not self._check_paired_results_exist():
1565 return
1566
1567 self._create_monitor()
1568 self.monitor.attach_to_existing_process(
1569 self._working_directory(), pidfile_name=self._pidfile_name(),
1570 num_processes=self.num_processes)
1571 if not self.monitor.has_process():
1572 # no process to recover; wait to be started normally
1573 self.monitor = None
1574 return
1575
1576 self.started = True
Aviv Keshet225bdfe2013-03-05 10:10:08 -08001577 logging.info('Recovering process %s for %s at %s',
1578 self.monitor.get_process(), type(self).__name__,
1579 self._working_directory())
mbligh36768f02008-02-22 18:28:33 +00001580
1581
mbligh4608b002010-01-05 18:22:35 +00001582 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1583 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001584 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001585 for entry in queue_entries:
1586 if entry.status not in allowed_hqe_statuses:
Dale Curtisaa513362011-03-01 17:27:44 -08001587 raise host_scheduler.SchedulerError(
1588 '%s attempting to start entry with invalid status %s: '
1589 '%s' % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001590 invalid_host_status = (
1591 allowed_host_statuses is not None
1592 and entry.host.status not in allowed_host_statuses)
1593 if invalid_host_status:
Dale Curtisaa513362011-03-01 17:27:44 -08001594 raise host_scheduler.SchedulerError(
1595 '%s attempting to start on queue entry with invalid '
1596 'host status %s: %s'
1597 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001598
1599
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001600SiteAgentTask = utils.import_site_class(
1601 __file__, 'autotest_lib.scheduler.site_monitor_db',
1602 'SiteAgentTask', BaseAgentTask)
1603
1604class AgentTask(SiteAgentTask):
1605 pass
1606
1607
showardd9205182009-04-27 20:09:55 +00001608class TaskWithJobKeyvals(object):
1609 """AgentTask mixin providing functionality to help with job keyval files."""
1610 _KEYVAL_FILE = 'keyval'
1611 def _format_keyval(self, key, value):
1612 return '%s=%s' % (key, value)
1613
1614
1615 def _keyval_path(self):
1616 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001617 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001618
1619
1620 def _write_keyval_after_job(self, field, value):
1621 assert self.monitor
1622 if not self.monitor.has_process():
1623 return
1624 _drone_manager.write_lines_to_file(
1625 self._keyval_path(), [self._format_keyval(field, value)],
1626 paired_with_process=self.monitor.get_process())
1627
1628
1629 def _job_queued_keyval(self, job):
1630 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1631
1632
1633 def _write_job_finished(self):
1634 self._write_keyval_after_job("job_finished", int(time.time()))
1635
1636
showarddb502762009-09-09 15:31:20 +00001637 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1638 keyval_contents = '\n'.join(self._format_keyval(key, value)
1639 for key, value in keyval_dict.iteritems())
1640 # always end with a newline to allow additional keyvals to be written
1641 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001642 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001643 keyval_contents,
1644 file_path=keyval_path)
1645
1646
1647 def _write_keyvals_before_job(self, keyval_dict):
1648 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1649
1650
1651 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001652 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001653 host.hostname)
1654 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001655 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001656 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1657 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1658
1659
showard8cc058f2009-09-08 16:26:33 +00001660class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001661 """
1662 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1663 """
1664
1665 TASK_TYPE = None
1666 host = None
1667 queue_entry = None
1668
showardd1195652009-12-08 22:21:02 +00001669 def __init__(self, task, extra_command_args):
1670 super(SpecialAgentTask, self).__init__()
1671
lmrb7c5d272010-04-16 06:34:04 +00001672 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001673
jamesrenc44ae992010-02-19 00:12:54 +00001674 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001675 self.queue_entry = None
1676 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001677 self.queue_entry = scheduler_models.HostQueueEntry(
1678 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001679
showarded2afea2009-07-07 20:54:07 +00001680 self.task = task
1681 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001682
1683
showard8cc058f2009-09-08 16:26:33 +00001684 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001685 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1686
1687
1688 def _command_line(self):
1689 return _autoserv_command_line(self.host.hostname,
1690 self._extra_command_args,
1691 queue_entry=self.queue_entry)
1692
1693
1694 def _working_directory(self):
1695 return self.task.execution_path()
1696
1697
1698 @property
1699 def owner_username(self):
1700 if self.task.requested_by:
1701 return self.task.requested_by.login
1702 return None
showard8cc058f2009-09-08 16:26:33 +00001703
1704
showarded2afea2009-07-07 20:54:07 +00001705 def prolog(self):
1706 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001707 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001708 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001709
1710
showardde634ee2009-01-30 01:44:24 +00001711 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001712 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001713
showard2fe3f1d2009-07-06 20:19:11 +00001714 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001715 return # don't fail metahost entries, they'll be reassigned
1716
showard2fe3f1d2009-07-06 20:19:11 +00001717 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001718 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001719 return # entry has been aborted
1720
Alex Millerdfff2fd2013-05-28 13:05:06 -07001721 self._actually_fail_queue_entry()
1722
1723
1724 # TODO(milleral): http://crbug.com/268607
1725 # All this used to be a part of _fail_queue_entry. The
1726 # exact semantics of when one should and should not be failing a queue
1727 # entry need to be worked out, because provisioning has placed us in a
1728 # case where we want to fail a queue entry that could be requeued,
1729 # which makes us fail the two above if statements, and thus
1730 # _fail_queue_entry() would exit early and have no effect.
1731 # What's left here with _actually_fail_queue_entry is a hack to be able to
1732 # bypass the checks and unconditionally execute the code.
1733 def _actually_fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001734 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001735 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001736 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001737 self._write_keyval_after_job(queued_key, queued_time)
1738 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001739
showard8cc058f2009-09-08 16:26:33 +00001740 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001741 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001742 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001743 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001744
showard8cc058f2009-09-08 16:26:33 +00001745 pidfile_id = _drone_manager.get_pidfile_id_from(
1746 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001747 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001748 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001749
1750 if self.queue_entry.job.parse_failed_repair:
1751 self._parse_results([self.queue_entry])
1752 else:
1753 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001754
Alex Miller23676a22013-07-03 09:03:36 -07001755 # Also fail all other special tasks that have not yet run for this HQE
1756 pending_tasks = models.SpecialTask.objects.filter(
1757 queue_entry__id=self.queue_entry.id,
1758 is_complete=0)
Alex Miller5e36ccc2013-08-03 16:31:58 -07001759 for task in pending_tasks:
1760 task.finish(False)
Alex Miller23676a22013-07-03 09:03:36 -07001761
showard8cc058f2009-09-08 16:26:33 +00001762
1763 def cleanup(self):
1764 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001765
1766 # We will consider an aborted task to be "Failed"
1767 self.task.finish(bool(self.success))
1768
showardf85a0b72009-10-07 20:48:45 +00001769 if self.monitor:
1770 if self.monitor.has_process():
1771 self._copy_results([self.task])
1772 if self.monitor.pidfile_id is not None:
1773 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001774
1775
Dan Shi07e09af2013-04-12 09:31:29 -07001776 def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
1777 """Remove a type of special task in all tasks, keep last one if needed.
1778
1779 @param special_task_to_remove: type of special task to be removed, e.g.,
1780 models.SpecialTask.Task.VERIFY.
1781 @param keep_last_one: True to keep the last special task if its type is
1782 the same as of special_task_to_remove.
1783
1784 """
1785 queued_special_tasks = models.SpecialTask.objects.filter(
1786 host__id=self.host.id,
1787 task=special_task_to_remove,
1788 is_active=False, is_complete=False, queue_entry=None)
1789 if keep_last_one:
1790 queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
1791 queued_special_tasks.delete()
1792
1793
showard8cc058f2009-09-08 16:26:33 +00001794class RepairTask(SpecialAgentTask):
1795 TASK_TYPE = models.SpecialTask.Task.REPAIR
1796
1797
showardd1195652009-12-08 22:21:02 +00001798 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001799 """\
1800 queue_entry: queue entry to mark failed if this repair fails.
1801 """
1802 protection = host_protections.Protection.get_string(
1803 task.host.protection)
1804 # normalize the protection name
1805 protection = host_protections.Protection.get_attr_name(protection)
1806
1807 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001808 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001809
1810 # *don't* include the queue entry in IDs -- if the queue entry is
1811 # aborted, we want to leave the repair task running
1812 self._set_ids(host=self.host)
1813
1814
1815 def prolog(self):
1816 super(RepairTask, self).prolog()
1817 logging.info("repair_task starting")
1818 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001819
1820
jadmanski0afbb632008-06-06 21:10:57 +00001821 def epilog(self):
1822 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001823
jadmanski0afbb632008-06-06 21:10:57 +00001824 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001825 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001826 else:
showard8cc058f2009-09-08 16:26:33 +00001827 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001828 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001829 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001830
1831
showarded2afea2009-07-07 20:54:07 +00001832class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001833 def _copy_to_results_repository(self):
1834 if not self.queue_entry or self.queue_entry.meta_host:
1835 return
1836
1837 self.queue_entry.set_execution_subdir()
1838 log_name = os.path.basename(self.task.execution_path())
1839 source = os.path.join(self.task.execution_path(), 'debug',
1840 'autoserv.DEBUG')
1841 destination = os.path.join(
1842 self.queue_entry.execution_path(), log_name)
1843
1844 self.monitor.try_copy_to_results_repository(
1845 source, destination_path=destination)
1846
1847
showard170873e2009-01-07 00:22:26 +00001848 def epilog(self):
1849 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001850
showard775300b2009-09-09 15:30:50 +00001851 if self.success:
1852 return
showard8fe93b52008-11-18 17:53:22 +00001853
showard775300b2009-09-09 15:30:50 +00001854 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001855 # effectively ignore failure for these hosts
1856 self.success = True
showard775300b2009-09-09 15:30:50 +00001857 return
1858
1859 if self.queue_entry:
Alex Millerf3f19452013-07-29 15:53:00 -07001860 # If we requeue a HQE, we should cancel any remaining pre-job
1861 # tasks against this host, otherwise we'll be left in a state
1862 # where a queued HQE has special tasks to run against a host.
1863 models.SpecialTask.objects.filter(
1864 queue_entry__id=self.queue_entry.id,
1865 host__id=self.host.id,
1866 is_complete=0).update(is_complete=1, success=0)
showard775300b2009-09-09 15:30:50 +00001867
Alex Millera4a78ef2013-09-03 21:23:05 -07001868 previous_provisions = models.SpecialTask.objects.filter(
1869 task=models.SpecialTask.Task.PROVISION,
1870 queue_entry_id=self.queue_entry.id).count()
Alex Miller7bcec082013-09-19 10:00:53 -07001871 if (previous_provisions >
Alex Millera4a78ef2013-09-03 21:23:05 -07001872 scheduler_config.config.max_provision_retries):
1873 self._actually_fail_queue_entry()
1874 # This abort will mark the aborted bit on the HQE itself, to
1875 # signify that we're killing it. Technically it also will do
1876 # the recursive aborting of all child jobs, but that shouldn't
1877 # matter here, as only suites have children, and those are
1878 # hostless and thus don't have provisioning.
1879 # TODO(milleral) http://crbug.com/188217
1880 # However, we can't actually do this yet, as if we set the
1881 # abort bit the FinalReparseTask will set the status of the HQE
1882 # to ABORTED, which then means that we don't show the status in
1883 # run_suite. So in the meantime, don't mark the HQE as
1884 # aborted.
1885 # queue_entry.abort()
1886 else:
1887 # requeue() must come after handling provision retries, since
1888 # _actually_fail_queue_entry needs an execution subdir.
1889 # We also don't want to requeue if we hit the provision retry
1890 # limit, since then we overwrite the PARSING state of the HQE.
1891 self.queue_entry.requeue()
1892
1893 previous_repairs = models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001894 task=models.SpecialTask.Task.REPAIR,
Alex Millera4a78ef2013-09-03 21:23:05 -07001895 queue_entry_id=self.queue_entry.id).count()
1896 if previous_repairs >= scheduler_config.config.max_repair_limit:
showard775300b2009-09-09 15:30:50 +00001897 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1898 self._fail_queue_entry()
1899 return
1900
showard9bb960b2009-11-19 01:02:11 +00001901 queue_entry = models.HostQueueEntry.objects.get(
1902 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001903 else:
1904 queue_entry = None
1905
1906 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001907 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001908 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001909 queue_entry=queue_entry,
1910 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001911
showard8fe93b52008-11-18 17:53:22 +00001912
Alex Miller42437f92013-05-28 12:58:54 -07001913 def _should_pending(self):
1914 """
1915 Decide if we should call the host queue entry's on_pending method.
1916 We should if:
1917 1) There exists an associated host queue entry.
1918 2) The current special task completed successfully.
1919 3) There do not exist any more special tasks to be run before the
1920 host queue entry starts.
1921
1922 @returns: True if we should call pending, false if not.
1923
1924 """
1925 if not self.queue_entry or not self.success:
1926 return False
1927
1928 # We know if this is the last one when we create it, so we could add
1929 # another column to the database to keep track of this information, but
1930 # I expect the overhead of querying here to be minimal.
1931 queue_entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1932 queued = models.SpecialTask.objects.filter(
1933 host__id=self.host.id, is_active=False,
1934 is_complete=False, queue_entry=queue_entry)
1935 queued = queued.exclude(id=self.task.id)
1936 return queued.count() == 0
1937
1938
showard8fe93b52008-11-18 17:53:22 +00001939class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001940 TASK_TYPE = models.SpecialTask.Task.VERIFY
1941
1942
showardd1195652009-12-08 22:21:02 +00001943 def __init__(self, task):
1944 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001945 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001946
1947
jadmanski0afbb632008-06-06 21:10:57 +00001948 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001949 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001950
showardb18134f2009-03-20 20:52:18 +00001951 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001952 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001953 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1954 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001955
jamesren42318f72010-05-10 23:40:59 +00001956 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00001957 # and there's no need to keep records of other requests.
Dan Shi07e09af2013-04-12 09:31:29 -07001958 self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
1959 keep_last_one=True)
showard2fe3f1d2009-07-06 20:19:11 +00001960
mbligh36768f02008-02-22 18:28:33 +00001961
jadmanski0afbb632008-06-06 21:10:57 +00001962 def epilog(self):
1963 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001964 if self.success:
Alex Miller42437f92013-05-28 12:58:54 -07001965 if self._should_pending():
showard8cc058f2009-09-08 16:26:33 +00001966 self.queue_entry.on_pending()
1967 else:
1968 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001969
1970
mbligh4608b002010-01-05 18:22:35 +00001971class CleanupTask(PreJobTask):
1972 # note this can also run post-job, but when it does, it's running standalone
1973 # against the host (not related to the job), so it's not considered a
1974 # PostJobTask
1975
1976 TASK_TYPE = models.SpecialTask.Task.CLEANUP
1977
1978
1979 def __init__(self, task, recover_run_monitor=None):
1980 super(CleanupTask, self).__init__(task, ['--cleanup'])
1981 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1982
1983
1984 def prolog(self):
1985 super(CleanupTask, self).prolog()
1986 logging.info("starting cleanup task for host: %s", self.host.hostname)
1987 self.host.set_status(models.Host.Status.CLEANING)
1988 if self.queue_entry:
Dan Shi07e09af2013-04-12 09:31:29 -07001989 self.queue_entry.set_status(models.HostQueueEntry.Status.CLEANING)
mbligh4608b002010-01-05 18:22:35 +00001990
1991
1992 def _finish_epilog(self):
1993 if not self.queue_entry or not self.success:
1994 return
1995
1996 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
1997 should_run_verify = (
1998 self.queue_entry.job.run_verify
1999 and self.host.protection != do_not_verify_protection)
2000 if should_run_verify:
2001 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
2002 models.SpecialTask.objects.create(
2003 host=models.Host.objects.get(id=self.host.id),
2004 queue_entry=entry,
2005 task=models.SpecialTask.Task.VERIFY)
2006 else:
Alex Miller42437f92013-05-28 12:58:54 -07002007 if self._should_pending():
2008 self.queue_entry.on_pending()
mbligh4608b002010-01-05 18:22:35 +00002009
2010
2011 def epilog(self):
2012 super(CleanupTask, self).epilog()
2013
2014 if self.success:
2015 self.host.update_field('dirty', 0)
2016 self.host.set_status(models.Host.Status.READY)
2017
2018 self._finish_epilog()
2019
2020
Dan Shi07e09af2013-04-12 09:31:29 -07002021class ResetTask(PreJobTask):
2022 """Task to reset a DUT, including cleanup and verify."""
2023 # note this can also run post-job, but when it does, it's running standalone
2024 # against the host (not related to the job), so it's not considered a
2025 # PostJobTask
2026
2027 TASK_TYPE = models.SpecialTask.Task.RESET
2028
2029
2030 def __init__(self, task, recover_run_monitor=None):
2031 super(ResetTask, self).__init__(task, ['--reset'])
2032 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2033
2034
2035 def prolog(self):
2036 super(ResetTask, self).prolog()
2037 logging.info('starting reset task for host: %s',
2038 self.host.hostname)
2039 self.host.set_status(models.Host.Status.RESETTING)
2040 if self.queue_entry:
2041 self.queue_entry.set_status(models.HostQueueEntry.Status.RESETTING)
2042
2043 # Delete any queued cleanups for this host.
2044 self.remove_special_tasks(models.SpecialTask.Task.CLEANUP,
2045 keep_last_one=False)
2046
2047 # Delete any queued reverifies for this host.
2048 self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
2049 keep_last_one=False)
2050
2051 # Only one reset is needed.
2052 self.remove_special_tasks(models.SpecialTask.Task.RESET,
2053 keep_last_one=True)
2054
2055
2056 def epilog(self):
2057 super(ResetTask, self).epilog()
2058
2059 if self.success:
2060 self.host.update_field('dirty', 0)
Dan Shi07e09af2013-04-12 09:31:29 -07002061
Alex Millerba076c52013-07-11 10:11:48 -07002062 if self._should_pending():
Dan Shi07e09af2013-04-12 09:31:29 -07002063 self.queue_entry.on_pending()
Alex Millerdc608d52013-07-30 14:26:21 -07002064 else:
2065 self.host.set_status(models.Host.Status.READY)
Dan Shi07e09af2013-04-12 09:31:29 -07002066
2067
Alex Millerdfff2fd2013-05-28 13:05:06 -07002068class ProvisionTask(PreJobTask):
2069 TASK_TYPE = models.SpecialTask.Task.PROVISION
2070
2071 def __init__(self, task):
2072 # Provisioning requires that we be associated with a job/queue entry
2073 assert task.queue_entry, "No HQE associated with provision task!"
2074 # task.queue_entry is an afe model HostQueueEntry object.
2075 # self.queue_entry is a scheduler models HostQueueEntry object, but
2076 # it gets constructed and assigned in __init__, so it's not available
2077 # yet. Therefore, we're stuck pulling labels off of the afe model
2078 # so that we can pass the --provision args into the __init__ call.
2079 labels = {x.name for x in task.queue_entry.job.dependency_labels.all()}
2080 _, provisionable = provision.filter_labels(labels)
2081 extra_command_args = ['--provision', ','.join(provisionable)]
2082 super(ProvisionTask, self).__init__(task, extra_command_args)
2083 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2084
2085
2086 def _command_line(self):
2087 # If we give queue_entry to _autoserv_command_line, then it will append
2088 # -c for this invocation if the queue_entry is a client side test. We
2089 # don't want that, as it messes with provisioning, so we just drop it
2090 # from the arguments here.
2091 # Note that we also don't verify job_repo_url as provisioining tasks are
2092 # required to stage whatever content we need, and the job itself will
2093 # force autotest to be staged if it isn't already.
2094 return _autoserv_command_line(self.host.hostname,
2095 self._extra_command_args)
2096
2097
2098 def prolog(self):
2099 super(ProvisionTask, self).prolog()
2100 # add check for previous provision task and abort if exist.
2101 logging.info("starting provision task for host: %s", self.host.hostname)
2102 self.queue_entry.set_status(
2103 models.HostQueueEntry.Status.PROVISIONING)
2104 self.host.set_status(models.Host.Status.PROVISIONING)
2105
2106
2107 def epilog(self):
Alex Millera4a78ef2013-09-03 21:23:05 -07002108 super(ProvisionTask, self).epilog()
Alex Millerdfff2fd2013-05-28 13:05:06 -07002109
Alex Millera4a78ef2013-09-03 21:23:05 -07002110 if self._should_pending():
Alex Millerdfff2fd2013-05-28 13:05:06 -07002111 self.queue_entry.on_pending()
2112 else:
2113 self.host.set_status(models.Host.Status.READY)
2114
2115
showarda9545c02009-12-18 22:44:26 +00002116class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
2117 """
2118 Common functionality for QueueTask and HostlessQueueTask
2119 """
2120 def __init__(self, queue_entries):
2121 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00002122 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00002123 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00002124
2125
showard73ec0442009-02-07 02:05:20 +00002126 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00002127 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00002128
2129
jamesrenc44ae992010-02-19 00:12:54 +00002130 def _write_control_file(self, execution_path):
2131 control_path = _drone_manager.attach_file_to_execution(
2132 execution_path, self.job.control_file)
2133 return control_path
2134
2135
Aviv Keshet308e7362013-05-21 14:43:16 -07002136 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00002137 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00002138 execution_path = self.queue_entries[0].execution_path()
2139 control_path = self._write_control_file(execution_path)
2140 hostnames = ','.join(entry.host.hostname
2141 for entry in self.queue_entries
2142 if not entry.is_hostless())
2143
2144 execution_tag = self.queue_entries[0].execution_tag()
2145 params = _autoserv_command_line(
2146 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07002147 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00002148 _drone_manager.absolute_path(control_path)],
2149 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07002150 if self.job.is_image_update_job():
2151 params += ['--image', self.job.update_image_path]
2152
jamesrenc44ae992010-02-19 00:12:54 +00002153 return params
showardd1195652009-12-08 22:21:02 +00002154
2155
2156 @property
2157 def num_processes(self):
2158 return len(self.queue_entries)
2159
2160
2161 @property
2162 def owner_username(self):
2163 return self.job.owner
2164
2165
2166 def _working_directory(self):
2167 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002168
2169
jadmanski0afbb632008-06-06 21:10:57 +00002170 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002171 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00002172 keyval_dict = self.job.keyval_dict()
2173 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00002174 group_name = self.queue_entries[0].get_group_name()
2175 if group_name:
2176 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002177 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002178 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002179 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002180 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002181
2182
showard35162b02009-03-03 02:17:30 +00002183 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002184 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002185 _drone_manager.write_lines_to_file(error_file_path,
2186 [_LOST_PROCESS_ERROR])
2187
2188
showardd3dc1992009-04-22 21:01:40 +00002189 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002190 if not self.monitor:
2191 return
2192
showardd9205182009-04-27 20:09:55 +00002193 self._write_job_finished()
2194
showard35162b02009-03-03 02:17:30 +00002195 if self.monitor.lost_process:
2196 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002197
jadmanskif7fa2cc2008-10-01 14:13:23 +00002198
showardcbd74612008-11-19 21:42:02 +00002199 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002200 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002201 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002202 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002203 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002204
2205
jadmanskif7fa2cc2008-10-01 14:13:23 +00002206 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002207 if not self.monitor or not self.monitor.has_process():
2208 return
2209
jadmanskif7fa2cc2008-10-01 14:13:23 +00002210 # build up sets of all the aborted_by and aborted_on values
2211 aborted_by, aborted_on = set(), set()
2212 for queue_entry in self.queue_entries:
2213 if queue_entry.aborted_by:
2214 aborted_by.add(queue_entry.aborted_by)
2215 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2216 aborted_on.add(t)
2217
2218 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00002219 # TODO(showard): this conditional is now obsolete, we just need to leave
2220 # it in temporarily for backwards compatibility over upgrades. delete
2221 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00002222 assert len(aborted_by) <= 1
2223 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002224 aborted_by_value = aborted_by.pop()
2225 aborted_on_value = max(aborted_on)
2226 else:
2227 aborted_by_value = 'autotest_system'
2228 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002229
showarda0382352009-02-11 23:36:43 +00002230 self._write_keyval_after_job("aborted_by", aborted_by_value)
2231 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002232
showardcbd74612008-11-19 21:42:02 +00002233 aborted_on_string = str(datetime.datetime.fromtimestamp(
2234 aborted_on_value))
2235 self._write_status_comment('Job aborted by %s on %s' %
2236 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002237
2238
jadmanski0afbb632008-06-06 21:10:57 +00002239 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002240 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002241 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002242 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002243
2244
jadmanski0afbb632008-06-06 21:10:57 +00002245 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002246 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002247 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002248
2249
2250class QueueTask(AbstractQueueTask):
2251 def __init__(self, queue_entries):
2252 super(QueueTask, self).__init__(queue_entries)
2253 self._set_ids(queue_entries=queue_entries)
2254
2255
2256 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002257 self._check_queue_entry_statuses(
2258 self.queue_entries,
2259 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2260 models.HostQueueEntry.Status.RUNNING),
2261 allowed_host_statuses=(models.Host.Status.PENDING,
2262 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002263
2264 super(QueueTask, self).prolog()
2265
2266 for queue_entry in self.queue_entries:
2267 self._write_host_keyvals(queue_entry.host)
2268 queue_entry.host.set_status(models.Host.Status.RUNNING)
2269 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00002270
2271
2272 def _finish_task(self):
2273 super(QueueTask, self)._finish_task()
2274
2275 for queue_entry in self.queue_entries:
2276 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00002277 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00002278
2279
Alex Miller9f01d5d2013-08-08 02:26:01 -07002280 def _command_line(self):
2281 invocation = super(QueueTask, self)._command_line()
2282 return invocation + ['--verify_job_repo_url']
2283
2284
Dan Shi1a189052013-10-28 14:41:35 -07002285class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00002286 def __init__(self, queue_entry):
2287 super(HostlessQueueTask, self).__init__([queue_entry])
2288 self.queue_entry_ids = [queue_entry.id]
2289
2290
2291 def prolog(self):
2292 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2293 super(HostlessQueueTask, self).prolog()
2294
2295
mbligh4608b002010-01-05 18:22:35 +00002296 def _finish_task(self):
2297 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002298 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002299
2300
showardd3dc1992009-04-22 21:01:40 +00002301class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002302 def __init__(self, queue_entries, log_file_name):
2303 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002304
showardd1195652009-12-08 22:21:02 +00002305 self.queue_entries = queue_entries
2306
showardd3dc1992009-04-22 21:01:40 +00002307 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002308 self._autoserv_monitor.attach_to_existing_process(
2309 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002310
showardd1195652009-12-08 22:21:02 +00002311
2312 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002313 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002314 return 'true'
2315 return self._generate_command(
2316 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002317
2318
2319 def _generate_command(self, results_dir):
2320 raise NotImplementedError('Subclasses must override this')
2321
2322
showardd1195652009-12-08 22:21:02 +00002323 @property
2324 def owner_username(self):
2325 return self.queue_entries[0].job.owner
2326
2327
2328 def _working_directory(self):
2329 return self._get_consistent_execution_path(self.queue_entries)
2330
2331
2332 def _paired_with_monitor(self):
2333 return self._autoserv_monitor
2334
2335
showardd3dc1992009-04-22 21:01:40 +00002336 def _job_was_aborted(self):
2337 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002338 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002339 queue_entry.update_from_database()
2340 if was_aborted is None: # first queue entry
2341 was_aborted = bool(queue_entry.aborted)
2342 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00002343 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
2344 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00002345 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00002346 'Inconsistent abort state',
2347 'Queue entries have inconsistent abort state:\n' +
2348 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00002349 # don't crash here, just assume true
2350 return True
2351 return was_aborted
2352
2353
showardd1195652009-12-08 22:21:02 +00002354 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002355 if self._job_was_aborted():
2356 return models.HostQueueEntry.Status.ABORTED
2357
2358 # we'll use a PidfileRunMonitor to read the autoserv exit status
2359 if self._autoserv_monitor.exit_code() == 0:
2360 return models.HostQueueEntry.Status.COMPLETED
2361 return models.HostQueueEntry.Status.FAILED
2362
2363
showardd3dc1992009-04-22 21:01:40 +00002364 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002365 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002366 queue_entry.set_status(status)
2367
2368
2369 def abort(self):
2370 # override AgentTask.abort() to avoid killing the process and ending
2371 # the task. post-job tasks continue when the job is aborted.
2372 pass
2373
2374
mbligh4608b002010-01-05 18:22:35 +00002375 def _pidfile_label(self):
2376 # '.autoserv_execute' -> 'autoserv'
2377 return self._pidfile_name()[1:-len('_execute')]
2378
2379
showard9bb960b2009-11-19 01:02:11 +00002380class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002381 """
2382 Task responsible for
2383 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2384 * copying logs to the results repository
2385 * spawning CleanupTasks for hosts, if necessary
2386 * spawning a FinalReparseTask for the job
2387 """
showardd1195652009-12-08 22:21:02 +00002388 def __init__(self, queue_entries, recover_run_monitor=None):
2389 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002390 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002391 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002392 self._set_ids(queue_entries=queue_entries)
2393
2394
Aviv Keshet308e7362013-05-21 14:43:16 -07002395 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd3dc1992009-04-22 21:01:40 +00002396 def _generate_command(self, results_dir):
2397 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002398 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002399 return [_autoserv_path , '-p',
2400 '--pidfile-label=%s' % self._pidfile_label(),
2401 '--use-existing-results', '--collect-crashinfo',
2402 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002403
2404
showardd1195652009-12-08 22:21:02 +00002405 @property
2406 def num_processes(self):
2407 return len(self.queue_entries)
2408
2409
2410 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002411 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002412
2413
showardd3dc1992009-04-22 21:01:40 +00002414 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002415 self._check_queue_entry_statuses(
2416 self.queue_entries,
2417 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2418 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002419
showardd3dc1992009-04-22 21:01:40 +00002420 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002421
2422
showardd3dc1992009-04-22 21:01:40 +00002423 def epilog(self):
2424 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002425 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002426 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002427
showard9bb960b2009-11-19 01:02:11 +00002428
2429 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002430 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002431 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002432 models.HostQueueEntry.Status.COMPLETED)
2433 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2434 else:
2435 final_success = False
2436 num_tests_failed = 0
showard9bb960b2009-11-19 01:02:11 +00002437 reboot_after = self._job.reboot_after
2438 do_reboot = (
2439 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002440 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002441 or reboot_after == model_attributes.RebootAfter.ALWAYS
2442 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
Dan Shi07e09af2013-04-12 09:31:29 -07002443 and final_success and num_tests_failed == 0)
2444 or num_tests_failed > 0)
showard9bb960b2009-11-19 01:02:11 +00002445
showardd1195652009-12-08 22:21:02 +00002446 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002447 if do_reboot:
2448 # don't pass the queue entry to the CleanupTask. if the cleanup
2449 # fails, the job doesn't care -- it's over.
2450 models.SpecialTask.objects.create(
2451 host=models.Host.objects.get(id=queue_entry.host.id),
2452 task=models.SpecialTask.Task.CLEANUP,
2453 requested_by=self._job.owner_model())
2454 else:
2455 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002456
2457
showard0bbfc212009-04-29 21:06:13 +00002458 def run(self):
showard597bfd32009-05-08 18:22:50 +00002459 autoserv_exit_code = self._autoserv_monitor.exit_code()
2460 # only run if Autoserv exited due to some signal. if we have no exit
2461 # code, assume something bad (and signal-like) happened.
2462 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002463 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002464 else:
2465 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002466
2467
Dan Shi1a189052013-10-28 14:41:35 -07002468class SelfThrottledPostJobTask(PostJobTask):
2469 """
2470 Special AgentTask subclass that maintains its own global process limit.
2471 """
2472 _num_running_processes = 0
2473 # Last known limit of max processes, used to check whether
2474 # max processes config has been changed.
2475 _last_known_max_processes = 0
2476 # Whether an email should be sent to notifiy process limit being hit.
2477 _notification_on = True
2478 # Once process limit is hit, an email will be sent.
2479 # To prevent spams, do not send another email until
2480 # it drops to lower than the following level.
2481 REVIVE_NOTIFICATION_THRESHOLD = 0.80
2482
2483
2484 @classmethod
2485 def _increment_running_processes(cls):
2486 cls._num_running_processes += 1
2487 stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__,
2488 cls._num_running_processes)
2489
2490
2491 @classmethod
2492 def _decrement_running_processes(cls):
2493 cls._num_running_processes -= 1
2494 stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__,
2495 cls._num_running_processes)
2496
2497
2498 @classmethod
2499 def _max_processes(cls):
2500 raise NotImplementedError
2501
2502
2503 @classmethod
2504 def _can_run_new_process(cls):
2505 return cls._num_running_processes < cls._max_processes()
2506
2507
2508 def _process_started(self):
2509 return bool(self.monitor)
2510
2511
2512 def tick(self):
2513 # override tick to keep trying to start until the process count goes
2514 # down and we can, at which point we revert to default behavior
2515 if self._process_started():
2516 super(SelfThrottledPostJobTask, self).tick()
2517 else:
2518 self._try_starting_process()
2519
2520
2521 def run(self):
2522 # override run() to not actually run unless we can
2523 self._try_starting_process()
2524
2525
2526 @classmethod
2527 def _notify_process_limit_hit(cls):
2528 """Send an email to notify that process limit is hit."""
2529 if cls._notification_on:
2530 subject = '%s: hitting max process limit.' % cls.__name__
2531 message = ('Running processes/Max processes: %d/%d'
2532 % (cls._num_running_processes, cls._max_processes()))
2533 email_manager.manager.enqueue_notify_email(subject, message)
2534 cls._notification_on = False
2535
2536
2537 @classmethod
2538 def _reset_notification_switch_if_necessary(cls):
2539 """Reset _notification_on if necessary.
2540
2541 Set _notification_on to True on the following cases:
2542 1) If the limit of max processes configuration changes;
2543 2) If _notification_on is False and the number of running processes
2544 drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD.
2545
2546 """
2547 if cls._last_known_max_processes != cls._max_processes():
2548 cls._notification_on = True
2549 cls._last_known_max_processes = cls._max_processes()
2550 return
2551 percentage = float(cls._num_running_processes) / cls._max_processes()
2552 if (not cls._notification_on and
2553 percentage < cls.REVIVE_NOTIFICATION_THRESHOLD):
2554 cls._notification_on = True
2555
2556
2557 def _try_starting_process(self):
2558 self._reset_notification_switch_if_necessary()
2559 if not self._can_run_new_process():
2560 self._notify_process_limit_hit()
2561 return
2562
2563 # actually run the command
2564 super(SelfThrottledPostJobTask, self).run()
2565 if self._process_started():
2566 self._increment_running_processes()
2567
2568
2569 def finished(self, success):
2570 super(SelfThrottledPostJobTask, self).finished(success)
2571 if self._process_started():
2572 self._decrement_running_processes()
2573
2574
2575class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002576 def __init__(self, queue_entries):
2577 super(FinalReparseTask, self).__init__(queue_entries,
2578 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002579 # don't use _set_ids, since we don't want to set the host_ids
2580 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002581
2582
2583 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002584 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002585 results_dir]
2586
2587
2588 @property
2589 def num_processes(self):
2590 return 0 # don't include parser processes in accounting
2591
2592
2593 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002594 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002595
2596
showard97aed502008-11-04 02:01:24 +00002597 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002598 def _max_processes(cls):
2599 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002600
2601
2602 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002603 self._check_queue_entry_statuses(
2604 self.queue_entries,
2605 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002606
showard97aed502008-11-04 02:01:24 +00002607 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002608
2609
2610 def epilog(self):
2611 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002612 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002613
2614
Dan Shi1a189052013-10-28 14:41:35 -07002615class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002616 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2617
mbligh4608b002010-01-05 18:22:35 +00002618 def __init__(self, queue_entries):
2619 super(ArchiveResultsTask, self).__init__(queue_entries,
2620 log_file_name='.archiving.log')
2621 # don't use _set_ids, since we don't want to set the host_ids
2622 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002623
2624
mbligh4608b002010-01-05 18:22:35 +00002625 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002626 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002627
2628
Aviv Keshet308e7362013-05-21 14:43:16 -07002629 # TODO: Refactor into autoserv_utils. crbug.com/243090
mbligh4608b002010-01-05 18:22:35 +00002630 def _generate_command(self, results_dir):
2631 return [_autoserv_path , '-p',
2632 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002633 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002634 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2635 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002636
2637
mbligh4608b002010-01-05 18:22:35 +00002638 @classmethod
2639 def _max_processes(cls):
2640 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002641
2642
2643 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002644 self._check_queue_entry_statuses(
2645 self.queue_entries,
2646 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2647
2648 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002649
2650
mbligh4608b002010-01-05 18:22:35 +00002651 def epilog(self):
2652 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002653 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002654 failed_file = os.path.join(self._working_directory(),
2655 self._ARCHIVING_FAILED_FILE)
2656 paired_process = self._paired_with_monitor().get_process()
2657 _drone_manager.write_lines_to_file(
2658 failed_file, ['Archiving failed with exit code %s'
2659 % self.monitor.exit_code()],
2660 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002661 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002662
2663
mbligh36768f02008-02-22 18:28:33 +00002664if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002665 main()