blob: c53e263b9daf8949ef1c82133d8a3b39cd1d7045 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
mbligh36768f02008-02-22 18:28:33 +00008
Aviv Keshet225bdfe2013-03-05 10:10:08 -08009import datetime, optparse, os, signal
10import sys, time, traceback, urllib
11import logging, gc
showard402934a2009-12-21 22:20:47 +000012
Alex Miller05d7b4c2013-03-04 07:49:38 -080013import common
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000015
16import django.db
17
showard136e6dc2009-06-10 19:38:49 +000018from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000019from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000020from autotest_lib.database import database_connection
jamesrendd855242010-03-02 22:23:44 +000021from autotest_lib.frontend.afe import model_attributes
Alex Miller05d7b4c2013-03-04 07:49:38 -080022from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000023from autotest_lib.scheduler import drone_manager, drones, email_manager
Dale Curtisaa513362011-03-01 17:27:44 -080024from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
Alex Miller05d7b4c2013-03-04 07:49:38 -080025from autotest_lib.scheduler import scheduler_logging_config
jamesrenc44ae992010-02-19 00:12:54 +000026from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080027from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070028from autotest_lib.server import autoserv_utils
Alex Millerdfff2fd2013-05-28 13:05:06 -070029from autotest_lib.server.cros import provision
Fang Deng1d6c2a02013-04-17 15:25:45 -070030from autotest_lib.site_utils.graphite import stats
Alex Miller05d7b4c2013-03-04 07:49:38 -080031
showard549afad2009-08-20 23:33:36 +000032BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
33PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000034
mbligh36768f02008-02-22 18:28:33 +000035RESULTS_DIR = '.'
36AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000037DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000038AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
39
40if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000041 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000042AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
43AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
44
45if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000046 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000047
showard35162b02009-03-03 02:17:30 +000048# error message to leave in results dir when an autoserv process disappears
49# mysteriously
50_LOST_PROCESS_ERROR = """\
51Autoserv failed abnormally during execution for this job, probably due to a
52system error on the Autotest server. Full results may not be available. Sorry.
53"""
54
mbligh6f8bab42008-02-29 22:45:14 +000055_db = None
mbligh36768f02008-02-22 18:28:33 +000056_shutdown = False
Aviv Keshet308e7362013-05-21 14:43:16 -070057_autoserv_directory = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server')
58_autoserv_path = os.path.join(_autoserv_directory, 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000059_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000060_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000061
Eric Lie0493a42010-11-15 13:05:43 -080062def _parser_path_default(install_dir):
63 return os.path.join(install_dir, 'tko', 'parse')
64_parser_path_func = utils.import_site_function(
65 __file__, 'autotest_lib.scheduler.site_monitor_db',
66 'parser_path', _parser_path_default)
67_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
68
mbligh36768f02008-02-22 18:28:33 +000069
showardec6a3b92009-09-25 20:29:13 +000070def _get_pidfile_timeout_secs():
71 """@returns How long to wait for autoserv to write pidfile."""
72 pidfile_timeout_mins = global_config.global_config.get_config_value(
73 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
74 return pidfile_timeout_mins * 60
75
76
mbligh83c1e9e2009-05-01 23:10:41 +000077def _site_init_monitor_db_dummy():
78 return {}
79
80
jamesren76fcf192010-04-21 20:39:50 +000081def _verify_default_drone_set_exists():
82 if (models.DroneSet.drone_sets_enabled() and
83 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080084 raise host_scheduler.SchedulerError(
85 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000086
87
88def _sanity_check():
89 """Make sure the configs are consistent before starting the scheduler"""
90 _verify_default_drone_set_exists()
91
92
mbligh36768f02008-02-22 18:28:33 +000093def main():
showard27f33872009-04-07 18:20:53 +000094 try:
showard549afad2009-08-20 23:33:36 +000095 try:
96 main_without_exception_handling()
97 except SystemExit:
98 raise
99 except:
100 logging.exception('Exception escaping in monitor_db')
101 raise
102 finally:
103 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000104
105
106def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000107 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000108
showard136e6dc2009-06-10 19:38:49 +0000109 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000110 parser = optparse.OptionParser(usage)
111 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
112 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000113 parser.add_option('--test', help='Indicate that scheduler is under ' +
114 'test and should use dummy autoserv and no parsing',
115 action='store_true')
116 (options, args) = parser.parse_args()
117 if len(args) != 1:
118 parser.print_usage()
119 return
mbligh36768f02008-02-22 18:28:33 +0000120
showard5613c662009-06-08 23:30:33 +0000121 scheduler_enabled = global_config.global_config.get_config_value(
122 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
123
124 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800125 logging.error("Scheduler not enabled, set enable_scheduler to true in "
126 "the global_config's SCHEDULER section to enable it. "
127 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000128 sys.exit(1)
129
jadmanski0afbb632008-06-06 21:10:57 +0000130 global RESULTS_DIR
131 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000132
mbligh83c1e9e2009-05-01 23:10:41 +0000133 site_init = utils.import_site_function(__file__,
134 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
135 _site_init_monitor_db_dummy)
136 site_init()
137
showardcca334f2009-03-12 20:38:34 +0000138 # Change the cwd while running to avoid issues incase we were launched from
139 # somewhere odd (such as a random NFS home directory of the person running
140 # sudo to launch us as the appropriate user).
141 os.chdir(RESULTS_DIR)
142
jamesrenc7d387e2010-08-10 21:48:30 +0000143 # This is helpful for debugging why stuff a scheduler launches is
144 # misbehaving.
145 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000146
jadmanski0afbb632008-06-06 21:10:57 +0000147 if options.test:
148 global _autoserv_path
149 _autoserv_path = 'autoserv_dummy'
150 global _testing_mode
151 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000152
jamesrenc44ae992010-02-19 00:12:54 +0000153 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000154 server.start()
155
jadmanski0afbb632008-06-06 21:10:57 +0000156 try:
jamesrenc44ae992010-02-19 00:12:54 +0000157 initialize()
showardc5afc462009-01-13 00:09:39 +0000158 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000159 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000160
Eric Lia82dc352011-02-23 13:15:52 -0800161 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000162 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000163 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000164 except:
showard170873e2009-01-07 00:22:26 +0000165 email_manager.manager.log_stacktrace(
166 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000167
showard170873e2009-01-07 00:22:26 +0000168 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000169 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000170 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000171 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000172
173
showard136e6dc2009-06-10 19:38:49 +0000174def setup_logging():
175 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
176 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
177 logging_manager.configure_logging(
178 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
179 logfile_name=log_name)
180
181
mbligh36768f02008-02-22 18:28:33 +0000182def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000183 global _shutdown
184 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000185 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000186
187
jamesrenc44ae992010-02-19 00:12:54 +0000188def initialize():
showardb18134f2009-03-20 20:52:18 +0000189 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
190 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000191
showard8de37132009-08-31 18:33:08 +0000192 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000193 logging.critical("monitor_db already running, aborting!")
194 sys.exit(1)
195 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000196
showardb1e51872008-10-07 11:08:18 +0000197 if _testing_mode:
198 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000199 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000200
jadmanski0afbb632008-06-06 21:10:57 +0000201 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
202 global _db
showard170873e2009-01-07 00:22:26 +0000203 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000204 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000205
showardfa8629c2008-11-04 16:51:23 +0000206 # ensure Django connection is in autocommit
207 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000208 # bypass the readonly connection
209 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000210
showardb18134f2009-03-20 20:52:18 +0000211 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000212 signal.signal(signal.SIGINT, handle_sigint)
213
jamesrenc44ae992010-02-19 00:12:54 +0000214 initialize_globals()
215 scheduler_models.initialize()
216
showardd1ee1dd2009-01-07 21:33:08 +0000217 drones = global_config.global_config.get_config_value(
218 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
219 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000220 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000221 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000222 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
223
showardb18134f2009-03-20 20:52:18 +0000224 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000225
226
jamesrenc44ae992010-02-19 00:12:54 +0000227def initialize_globals():
228 global _drone_manager
229 _drone_manager = drone_manager.instance()
230
231
showarded2afea2009-07-07 20:54:07 +0000232def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
233 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000234 """
235 @returns The autoserv command line as a list of executable + parameters.
236
237 @param machines - string - A machine or comma separated list of machines
238 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000239 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700240 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
241 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000242 @param queue_entry - A HostQueueEntry object - If supplied and no Job
243 object was supplied, this will be used to lookup the Job object.
244 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700245 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
246 machines, results_directory=drone_manager.WORKING_DIRECTORY,
247 extra_args=extra_args, job=job, queue_entry=queue_entry,
248 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000249
250
Simran Basia858a232012-08-21 11:04:37 -0700251class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800252
253
jadmanski0afbb632008-06-06 21:10:57 +0000254 def __init__(self):
255 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000256 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800257 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000258 user_cleanup_time = scheduler_config.config.clean_interval
259 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
260 _db, user_cleanup_time)
261 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000262 self._host_agents = {}
263 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000264 self._tick_count = 0
265 self._last_garbage_stats_time = time.time()
266 self._seconds_between_garbage_stats = 60 * (
267 global_config.global_config.get_config_value(
268 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700269 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700270 self._tick_debug = global_config.global_config.get_config_value(
271 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
272 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700273 self._extra_debugging = global_config.global_config.get_config_value(
274 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
275 default=False)
mbligh36768f02008-02-22 18:28:33 +0000276
mbligh36768f02008-02-22 18:28:33 +0000277
showard915958d2009-04-22 21:00:58 +0000278 def initialize(self, recover_hosts=True):
279 self._periodic_cleanup.initialize()
280 self._24hr_upkeep.initialize()
281
jadmanski0afbb632008-06-06 21:10:57 +0000282 # always recover processes
283 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000284
jadmanski0afbb632008-06-06 21:10:57 +0000285 if recover_hosts:
286 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000287
jamesrenc44ae992010-02-19 00:12:54 +0000288 self._host_scheduler.recovery_on_startup()
289
mbligh36768f02008-02-22 18:28:33 +0000290
Simran Basi0ec94dd2012-08-28 09:50:10 -0700291 def _log_tick_msg(self, msg):
292 if self._tick_debug:
293 logging.debug(msg)
294
295
Simran Basidef92872012-09-20 13:34:34 -0700296 def _log_extra_msg(self, msg):
297 if self._extra_debugging:
298 logging.debug(msg)
299
300
jadmanski0afbb632008-06-06 21:10:57 +0000301 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700302 """
303 This is an altered version of tick() where we keep track of when each
304 major step begins so we can try to figure out where we are using most
305 of the tick time.
306 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700307 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700308 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000309 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700310 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000311 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700312 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000313 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700314 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000315 self._find_aborting()
beeps8bb1f7d2013-08-05 01:30:09 -0700316 self._log_tick_msg('Calling _find_aborted_special_tasks().')
317 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700318 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000319 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700320 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000321 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700322 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000323 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700324 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000325 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700326 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000327 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700328 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000329 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700330 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000331 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700332 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000333 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700334 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700335 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700336 with timer.get_client('email_manager_send_queued_emails'):
337 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700338 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700339 with timer.get_client('django_db_reset_queries'):
340 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000341 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000342
showard97aed502008-11-04 02:01:24 +0000343
mblighf3294cc2009-04-08 21:17:38 +0000344 def _run_cleanup(self):
345 self._periodic_cleanup.run_cleanup_maybe()
346 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000347
mbligh36768f02008-02-22 18:28:33 +0000348
showardf13a9e22009-12-18 22:54:09 +0000349 def _garbage_collection(self):
350 threshold_time = time.time() - self._seconds_between_garbage_stats
351 if threshold_time < self._last_garbage_stats_time:
352 # Don't generate these reports very often.
353 return
354
355 self._last_garbage_stats_time = time.time()
356 # Force a full level 0 collection (because we can, it doesn't hurt
357 # at this interval).
358 gc.collect()
359 logging.info('Logging garbage collector stats on tick %d.',
360 self._tick_count)
361 gc_stats._log_garbage_collector_stats()
362
363
showard170873e2009-01-07 00:22:26 +0000364 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
365 for object_id in object_ids:
366 agent_dict.setdefault(object_id, set()).add(agent)
367
368
369 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
370 for object_id in object_ids:
371 assert object_id in agent_dict
372 agent_dict[object_id].remove(agent)
373
374
showardd1195652009-12-08 22:21:02 +0000375 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700376 """
377 Creates and adds an agent to the dispatchers list.
378
379 In creating the agent we also pass on all the queue_entry_ids and
380 host_ids from the special agent task. For every agent we create, we
381 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
382 against the host_ids given to it. So theoritically, a host can have any
383 number of agents associated with it, and each of them can have any
384 special agent task, though in practice we never see > 1 agent/task per
385 host at any time.
386
387 @param agent_task: A SpecialTask for the agent to manage.
388 """
showardd1195652009-12-08 22:21:02 +0000389 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000390 self._agents.append(agent)
391 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000392 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
393 self._register_agent_for_ids(self._queue_entry_agents,
394 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000395
showard170873e2009-01-07 00:22:26 +0000396
397 def get_agents_for_entry(self, queue_entry):
398 """
399 Find agents corresponding to the specified queue_entry.
400 """
showardd3dc1992009-04-22 21:01:40 +0000401 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000402
403
404 def host_has_agent(self, host):
405 """
406 Determine if there is currently an Agent present using this host.
407 """
408 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000409
410
jadmanski0afbb632008-06-06 21:10:57 +0000411 def remove_agent(self, agent):
412 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000413 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
414 agent)
415 self._unregister_agent_for_ids(self._queue_entry_agents,
416 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000417
418
showard8cc058f2009-09-08 16:26:33 +0000419 def _host_has_scheduled_special_task(self, host):
420 return bool(models.SpecialTask.objects.filter(host__id=host.id,
421 is_active=False,
422 is_complete=False))
423
424
jadmanski0afbb632008-06-06 21:10:57 +0000425 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000426 agent_tasks = self._create_recovery_agent_tasks()
427 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000428 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000429 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000430 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000431 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000432 self._reverify_remaining_hosts()
433 # reinitialize drones after killing orphaned processes, since they can
434 # leave around files when they die
435 _drone_manager.execute_actions()
436 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000437
showard170873e2009-01-07 00:22:26 +0000438
showardd1195652009-12-08 22:21:02 +0000439 def _create_recovery_agent_tasks(self):
440 return (self._get_queue_entry_agent_tasks()
441 + self._get_special_task_agent_tasks(is_active=True))
442
443
444 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700445 """
446 Get agent tasks for all hqe in the specified states.
447
448 Loosely this translates to taking a hqe in one of the specified states,
449 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
450 through _get_agent_task_for_queue_entry. Each queue entry can only have
451 one agent task at a time, but there might be multiple queue entries in
452 the group.
453
454 @return: A list of AgentTasks.
455 """
showardd1195652009-12-08 22:21:02 +0000456 # host queue entry statuses handled directly by AgentTasks (Verifying is
457 # handled through SpecialTasks, so is not listed here)
458 statuses = (models.HostQueueEntry.Status.STARTING,
459 models.HostQueueEntry.Status.RUNNING,
460 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000461 models.HostQueueEntry.Status.PARSING,
462 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000463 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000464 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000465 where='status IN (%s)' % status_list)
466
467 agent_tasks = []
468 used_queue_entries = set()
469 for entry in queue_entries:
470 if self.get_agents_for_entry(entry):
471 # already being handled
472 continue
473 if entry in used_queue_entries:
474 # already picked up by a synchronous job
475 continue
476 agent_task = self._get_agent_task_for_queue_entry(entry)
477 agent_tasks.append(agent_task)
478 used_queue_entries.update(agent_task.queue_entries)
479 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000480
481
showardd1195652009-12-08 22:21:02 +0000482 def _get_special_task_agent_tasks(self, is_active=False):
483 special_tasks = models.SpecialTask.objects.filter(
484 is_active=is_active, is_complete=False)
485 return [self._get_agent_task_for_special_task(task)
486 for task in special_tasks]
487
488
489 def _get_agent_task_for_queue_entry(self, queue_entry):
490 """
beeps8bb1f7d2013-08-05 01:30:09 -0700491 Construct an AgentTask instance for the given active HostQueueEntry.
492
showardd1195652009-12-08 22:21:02 +0000493 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700494 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000495 """
496 task_entries = queue_entry.job.get_group_entries(queue_entry)
497 self._check_for_duplicate_host_entries(task_entries)
498
499 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
500 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000501 if queue_entry.is_hostless():
502 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000503 return QueueTask(queue_entries=task_entries)
504 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
505 return GatherLogsTask(queue_entries=task_entries)
506 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
507 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000508 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
509 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000510
Dale Curtisaa513362011-03-01 17:27:44 -0800511 raise host_scheduler.SchedulerError(
512 '_get_agent_task_for_queue_entry got entry with '
513 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000514
515
516 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000517 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
518 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000519 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000520 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000521 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000522 if using_host:
showardd1195652009-12-08 22:21:02 +0000523 self._assert_host_has_no_agent(task_entry)
524
525
526 def _assert_host_has_no_agent(self, entry):
527 """
528 @param entry: a HostQueueEntry or a SpecialTask
529 """
530 if self.host_has_agent(entry.host):
531 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800532 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000533 'While scheduling %s, host %s already has a host agent %s'
534 % (entry, entry.host, agent.task))
535
536
537 def _get_agent_task_for_special_task(self, special_task):
538 """
539 Construct an AgentTask class to run the given SpecialTask and add it
540 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700541
542 A special task is create through schedule_special_tasks, but only if
543 the host doesn't already have an agent. This happens through
544 add_agent_task. All special agent tasks are given a host on creation,
545 and a Null hqe. To create a SpecialAgentTask object, you need a
546 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
547 object contains a hqe it's passed on to the special agent task, which
548 creates a HostQueueEntry and saves it as it's queue_entry.
549
showardd1195652009-12-08 22:21:02 +0000550 @param special_task: a models.SpecialTask instance
551 @returns an AgentTask to run this SpecialTask
552 """
553 self._assert_host_has_no_agent(special_task)
554
Dan Shi07e09af2013-04-12 09:31:29 -0700555 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask,
Alex Millerdfff2fd2013-05-28 13:05:06 -0700556 ResetTask, ProvisionTask)
showardd1195652009-12-08 22:21:02 +0000557 for agent_task_class in special_agent_task_classes:
558 if agent_task_class.TASK_TYPE == special_task.task:
559 return agent_task_class(task=special_task)
560
Dale Curtisaa513362011-03-01 17:27:44 -0800561 raise host_scheduler.SchedulerError(
562 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000563
564
565 def _register_pidfiles(self, agent_tasks):
566 for agent_task in agent_tasks:
567 agent_task.register_necessary_pidfiles()
568
569
570 def _recover_tasks(self, agent_tasks):
571 orphans = _drone_manager.get_orphaned_autoserv_processes()
572
573 for agent_task in agent_tasks:
574 agent_task.recover()
575 if agent_task.monitor and agent_task.monitor.has_process():
576 orphans.discard(agent_task.monitor.get_process())
577 self.add_agent_task(agent_task)
578
579 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000580
581
showard8cc058f2009-09-08 16:26:33 +0000582 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000583 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
584 % status):
showard0db3d432009-10-12 20:29:15 +0000585 if entry.status == status and not self.get_agents_for_entry(entry):
586 # The status can change during iteration, e.g., if job.run()
587 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000588 yield entry
589
590
showard6878e8b2009-07-20 22:37:45 +0000591 def _check_for_remaining_orphan_processes(self, orphans):
592 if not orphans:
593 return
594 subject = 'Unrecovered orphan autoserv processes remain'
595 message = '\n'.join(str(process) for process in orphans)
596 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000597
598 die_on_orphans = global_config.global_config.get_config_value(
599 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
600
601 if die_on_orphans:
602 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000603
showard170873e2009-01-07 00:22:26 +0000604
showard8cc058f2009-09-08 16:26:33 +0000605 def _recover_pending_entries(self):
606 for entry in self._get_unassigned_entries(
607 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000608 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000609 entry.on_pending()
610
611
showardb8900452009-10-12 20:31:01 +0000612 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000613 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000614 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
615 unrecovered_hqes = []
616 for queue_entry in queue_entries:
617 special_tasks = models.SpecialTask.objects.filter(
618 task__in=(models.SpecialTask.Task.CLEANUP,
619 models.SpecialTask.Task.VERIFY),
620 queue_entry__id=queue_entry.id,
621 is_complete=False)
622 if special_tasks.count() == 0:
623 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000624
showardb8900452009-10-12 20:31:01 +0000625 if unrecovered_hqes:
626 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800627 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000628 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000629 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000630
631
showard65db3932009-10-28 19:54:35 +0000632 def _get_prioritized_special_tasks(self):
633 """
634 Returns all queued SpecialTasks prioritized for repair first, then
635 cleanup, then verify.
beeps8bb1f7d2013-08-05 01:30:09 -0700636
637 @return: list of afe.models.SpecialTasks sorted according to priority.
showard65db3932009-10-28 19:54:35 +0000638 """
639 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
640 is_complete=False,
641 host__locked=False)
642 # exclude hosts with active queue entries unless the SpecialTask is for
643 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000644 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000645 queued_tasks, 'afe_host_queue_entries', 'host_id',
646 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000647 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000648 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000649 where=['(afe_host_queue_entries.id IS NULL OR '
650 'afe_host_queue_entries.id = '
651 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000652
showard65db3932009-10-28 19:54:35 +0000653 # reorder tasks by priority
654 task_priority_order = [models.SpecialTask.Task.REPAIR,
655 models.SpecialTask.Task.CLEANUP,
Dan Shi07e09af2013-04-12 09:31:29 -0700656 models.SpecialTask.Task.VERIFY,
Alex Millerdfff2fd2013-05-28 13:05:06 -0700657 models.SpecialTask.Task.RESET,
658 models.SpecialTask.Task.PROVISION]
showard65db3932009-10-28 19:54:35 +0000659 def task_priority_key(task):
660 return task_priority_order.index(task.task)
661 return sorted(queued_tasks, key=task_priority_key)
662
663
showard65db3932009-10-28 19:54:35 +0000664 def _schedule_special_tasks(self):
665 """
666 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700667
668 Special tasks include PreJobTasks like verify, reset and cleanup.
669 They are created through _schedule_new_jobs and associated with a hqe
670 This method translates SpecialTasks to the appropriate AgentTask and
671 adds them to the dispatchers agents list, so _handle_agents can execute
672 them.
showard65db3932009-10-28 19:54:35 +0000673 """
674 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000675 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000676 continue
showardd1195652009-12-08 22:21:02 +0000677 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000678
679
showard170873e2009-01-07 00:22:26 +0000680 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000681 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000682 # should never happen
showarded2afea2009-07-07 20:54:07 +0000683 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000684 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000685 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700686 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000687 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000688
689
jadmanski0afbb632008-06-06 21:10:57 +0000690 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000691 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700692 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000693 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000694 if self.host_has_agent(host):
695 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000696 continue
showard8cc058f2009-09-08 16:26:33 +0000697 if self._host_has_scheduled_special_task(host):
698 # host will have a special task scheduled on the next cycle
699 continue
showard170873e2009-01-07 00:22:26 +0000700 if print_message:
showardb18134f2009-03-20 20:52:18 +0000701 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000702 models.SpecialTask.objects.create(
703 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000704 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000705
706
jadmanski0afbb632008-06-06 21:10:57 +0000707 def _recover_hosts(self):
708 # recover "Repair Failed" hosts
709 message = 'Reverifying dead host %s'
710 self._reverify_hosts_where("status = 'Repair Failed'",
711 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000712
713
showard04c82c52008-05-29 19:38:12 +0000714
showardb95b1bd2008-08-15 18:11:04 +0000715 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000716 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +0000717 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000718 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000719 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +0000720 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000721
722
showard89f84db2009-03-12 20:39:13 +0000723 def _refresh_pending_queue_entries(self):
724 """
725 Lookup the pending HostQueueEntries and call our HostScheduler
726 refresh() method given that list. Return the list.
727
728 @returns A list of pending HostQueueEntries sorted in priority order.
729 """
showard63a34772008-08-18 19:32:50 +0000730 queue_entries = self._get_pending_queue_entries()
731 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000732 return []
showardb95b1bd2008-08-15 18:11:04 +0000733
showard63a34772008-08-18 19:32:50 +0000734 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000735
showard89f84db2009-03-12 20:39:13 +0000736 return queue_entries
737
738
739 def _schedule_atomic_group(self, queue_entry):
740 """
741 Schedule the given queue_entry on an atomic group of hosts.
742
743 Returns immediately if there are insufficient available hosts.
744
745 Creates new HostQueueEntries based off of queue_entry for the
746 scheduled hosts and starts them all running.
747 """
748 # This is a virtual host queue entry representing an entire
749 # atomic group, find a group and schedule their hosts.
750 group_hosts = self._host_scheduler.find_eligible_atomic_group(
751 queue_entry)
752 if not group_hosts:
753 return
showardcbe6f942009-06-17 19:33:49 +0000754
755 logging.info('Expanding atomic group entry %s with hosts %s',
756 queue_entry,
757 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000758
showard89f84db2009-03-12 20:39:13 +0000759 for assigned_host in group_hosts[1:]:
760 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000761 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000762 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000763 new_hqe.set_host(assigned_host)
764 self._run_queue_entry(new_hqe)
765
766 # The first assigned host uses the original HostQueueEntry
767 queue_entry.set_host(group_hosts[0])
768 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000769
770
showarda9545c02009-12-18 22:44:26 +0000771 def _schedule_hostless_job(self, queue_entry):
772 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000773 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000774
775
showard89f84db2009-03-12 20:39:13 +0000776 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700777 """
778 Find any new HQEs and call schedule_pre_job_tasks for it.
779
780 This involves setting the status of the HQE and creating a row in the
781 db corresponding the the special task, through
782 scheduler_models._queue_special_task. The new db row is then added as
783 an agent to the dispatcher through _schedule_special_tasks and
784 scheduled for execution on the drone through _handle_agents.
785 """
showard89f84db2009-03-12 20:39:13 +0000786 queue_entries = self._refresh_pending_queue_entries()
787 if not queue_entries:
788 return
789
Simran Basi3f6717d2012-09-13 15:21:22 -0700790 logging.debug('Processing %d queue_entries', len(queue_entries))
showard63a34772008-08-18 19:32:50 +0000791 for queue_entry in queue_entries:
Simran Basidef92872012-09-20 13:34:34 -0700792 self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
showarde55955f2009-10-07 20:48:58 +0000793 is_unassigned_atomic_group = (
794 queue_entry.atomic_group_id is not None
795 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +0000796
797 if queue_entry.is_hostless():
Simran Basidef92872012-09-20 13:34:34 -0700798 self._log_extra_msg('Scheduling hostless job.')
showarda9545c02009-12-18 22:44:26 +0000799 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000800 elif is_unassigned_atomic_group:
801 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +0000802 else:
jamesren883492a2010-02-12 00:45:18 +0000803 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +0000804 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +0000805 assert assigned_host.id == queue_entry.host_id
806 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000807
808
showard8cc058f2009-09-08 16:26:33 +0000809 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700810 """
811 Adds agents to the dispatcher.
812
813 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
814 QueueTask for example, will have a job with a control file, and
815 the agent will have methods that poll, abort and check if the queue
816 task is finished. The dispatcher runs the agent_task, as well as
817 other agents in it's _agents member, through _handle_agents, by
818 calling the Agents tick().
819
820 This method creates an agent for each HQE in one of (starting, running,
821 gathering, parsing, archiving) states, and adds it to the dispatcher so
822 it is handled by _handle_agents.
823 """
showardd1195652009-12-08 22:21:02 +0000824 for agent_task in self._get_queue_entry_agent_tasks():
825 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000826
827
828 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000829 for entry in scheduler_models.HostQueueEntry.fetch(
830 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000831 task = entry.job.schedule_delayed_callback_task(entry)
832 if task:
showardd1195652009-12-08 22:21:02 +0000833 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000834
835
jamesren883492a2010-02-12 00:45:18 +0000836 def _run_queue_entry(self, queue_entry):
Simran Basidef92872012-09-20 13:34:34 -0700837 self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
838 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000839 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000840
841
jadmanski0afbb632008-06-06 21:10:57 +0000842 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700843 """
844 Looks through the afe_host_queue_entries for an aborted entry.
845
846 The aborted bit is set on an HQE in many ways, the most common
847 being when a user requests an abort through the frontend, which
848 results in an rpc from the afe to abort_host_queue_entries.
849 """
jamesrene7c65cb2010-06-08 20:38:10 +0000850 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000851 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700852 where='aborted=1 and complete=0'):
showardf4a2e502009-07-28 20:06:39 +0000853 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +0000854 for agent in self.get_agents_for_entry(entry):
855 agent.abort()
856 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000857 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700858 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000859 for job in jobs_to_stop:
860 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000861
862
beeps8bb1f7d2013-08-05 01:30:09 -0700863 def _find_aborted_special_tasks(self):
864 """
865 Find SpecialTasks that have been marked for abortion.
866
867 Poll the database looking for SpecialTasks that are active
868 and have been marked for abortion, then abort them.
869 """
870
871 # The completed and active bits are very important when it comes
872 # to scheduler correctness. The active bit is set through the prolog
873 # of a special task, and reset through the cleanup method of the
874 # SpecialAgentTask. The cleanup is called both through the abort and
875 # epilog. The complete bit is set in several places, and in general
876 # a hanging job will have is_active=1 is_complete=0, while a special
877 # task which completed will have is_active=0 is_complete=1. To check
878 # aborts we directly check active because the complete bit is set in
879 # several places, including the epilog of agent tasks.
880 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
881 is_aborted=True)
882 for task in aborted_tasks:
883 # There are 2 ways to get the agent associated with a task,
884 # through the host and through the hqe. A special task
885 # always needs a host, but doesn't always need a hqe.
886 for agent in self._host_agents.get(task.host.id, []):
887 if isinstance(agent.task, SpecialAgentTask):
888
889 # The epilog preforms critical actions such as
890 # queueing the next SpecialTask, requeuing the
891 # hqe etc, however it doesn't actually kill the
892 # monitor process and set the 'done' bit. Epilogs
893 # assume that the job failed, and that the monitor
894 # process has already written an exit code. The
895 # done bit is a necessary condition for
896 # _handle_agents to schedule any more special
897 # tasks against the host, and it must be set
898 # in addition to is_active, is_complete and success.
899 agent.task.epilog()
900 agent.task.abort()
901
902
showard324bf812009-01-20 23:23:38 +0000903 def _can_start_agent(self, agent, num_started_this_cycle,
904 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000905 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000906 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000907 return True
908 # don't allow any nonzero-process agents to run after we've reached a
909 # limit (this avoids starvation of many-process agents)
910 if have_reached_limit:
911 return False
912 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000913 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000914 agent.task.owner_username,
915 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000916 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000917 return False
918 # if a single agent exceeds the per-cycle throttling, still allow it to
919 # run when it's the first agent in the cycle
920 if num_started_this_cycle == 0:
921 return True
922 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000923 if (num_started_this_cycle + agent.task.num_processes >
924 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000925 return False
926 return True
927
928
jadmanski0afbb632008-06-06 21:10:57 +0000929 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700930 """
931 Handles agents of the dispatcher.
932
933 Appropriate Agents are added to the dispatcher through
934 _schedule_running_host_queue_entries. These agents each
935 have a task. This method runs the agents task through
936 agent.tick() leading to:
937 agent.start
938 prolog -> AgentTasks prolog
939 For each queue entry:
940 sets host status/status to Running
941 set started_on in afe_host_queue_entries
942 run -> AgentTasks run
943 Creates PidfileRunMonitor
944 Queues the autoserv command line for this AgentTask
945 via the drone manager. These commands are executed
946 through the drone managers execute actions.
947 poll -> AgentTasks/BaseAgentTask poll
948 checks the monitors exit_code.
949 Executes epilog if task is finished.
950 Executes AgentTasks _finish_task
951 finish_task is usually responsible for setting the status
952 of the HQE/host, and updating it's active and complete fileds.
953
954 agent.is_done
955 Removed the agent from the dispatchers _agents queue.
956 Is_done checks the finished bit on the agent, that is
957 set based on the Agents task. During the agents poll
958 we check to see if the monitor process has exited in
959 it's finish method, and set the success member of the
960 task based on this exit code.
961 """
jadmanski0afbb632008-06-06 21:10:57 +0000962 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000963 have_reached_limit = False
964 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700965 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000966 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700967 self._log_extra_msg('Processing Agent with Host Ids: %s and '
968 'queue_entry ids:%s' % (agent.host_ids,
969 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000970 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000971 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000972 have_reached_limit):
973 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700974 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000975 continue
showardd1195652009-12-08 22:21:02 +0000976 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700977 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000978 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700979 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000980 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700981 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000982 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700983 logging.info('%d running processes. %d added this cycle.',
984 _drone_manager.total_running_processes(),
985 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +0000986
987
showard29f7cd22009-04-29 21:16:24 +0000988 def _process_recurring_runs(self):
989 recurring_runs = models.RecurringRun.objects.filter(
990 start_date__lte=datetime.datetime.now())
991 for rrun in recurring_runs:
992 # Create job from template
993 job = rrun.job
994 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000995 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000996
997 host_objects = info['hosts']
998 one_time_hosts = info['one_time_hosts']
999 metahost_objects = info['meta_hosts']
1000 dependencies = info['dependencies']
1001 atomic_group = info['atomic_group']
1002
1003 for host in one_time_hosts or []:
1004 this_host = models.Host.create_one_time_host(host.hostname)
1005 host_objects.append(this_host)
1006
1007 try:
1008 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001009 options=options,
showard29f7cd22009-04-29 21:16:24 +00001010 host_objects=host_objects,
1011 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001012 atomic_group=atomic_group)
1013
1014 except Exception, ex:
1015 logging.exception(ex)
1016 #TODO send email
1017
1018 if rrun.loop_count == 1:
1019 rrun.delete()
1020 else:
1021 if rrun.loop_count != 0: # if not infinite loop
1022 # calculate new start_date
1023 difference = datetime.timedelta(seconds=rrun.loop_period)
1024 rrun.start_date = rrun.start_date + difference
1025 rrun.loop_count -= 1
1026 rrun.save()
1027
1028
Simran Basia858a232012-08-21 11:04:37 -07001029SiteDispatcher = utils.import_site_class(
1030 __file__, 'autotest_lib.scheduler.site_monitor_db',
1031 'SiteDispatcher', BaseDispatcher)
1032
1033class Dispatcher(SiteDispatcher):
1034 pass
1035
1036
showard170873e2009-01-07 00:22:26 +00001037class PidfileRunMonitor(object):
1038 """
1039 Client must call either run() to start a new process or
1040 attach_to_existing_process().
1041 """
mbligh36768f02008-02-22 18:28:33 +00001042
showard170873e2009-01-07 00:22:26 +00001043 class _PidfileException(Exception):
1044 """
1045 Raised when there's some unexpected behavior with the pid file, but only
1046 used internally (never allowed to escape this class).
1047 """
mbligh36768f02008-02-22 18:28:33 +00001048
1049
showard170873e2009-01-07 00:22:26 +00001050 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001051 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001052 self._start_time = None
1053 self.pidfile_id = None
1054 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001055
1056
showard170873e2009-01-07 00:22:26 +00001057 def _add_nice_command(self, command, nice_level):
1058 if not nice_level:
1059 return command
1060 return ['nice', '-n', str(nice_level)] + command
1061
1062
1063 def _set_start_time(self):
1064 self._start_time = time.time()
1065
1066
showard418785b2009-11-23 20:19:59 +00001067 def run(self, command, working_directory, num_processes, nice_level=None,
1068 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +00001069 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +00001070 assert command is not None
1071 if nice_level is not None:
1072 command = ['nice', '-n', str(nice_level)] + command
1073 self._set_start_time()
1074 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001075 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001076 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +00001077 paired_with_pidfile=paired_with_pidfile, username=username,
1078 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +00001079
1080
showarded2afea2009-07-07 20:54:07 +00001081 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +00001082 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +00001083 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001084 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001085 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001086 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001087 if num_processes is not None:
1088 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001089
1090
jadmanski0afbb632008-06-06 21:10:57 +00001091 def kill(self):
showard170873e2009-01-07 00:22:26 +00001092 if self.has_process():
1093 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001094
mbligh36768f02008-02-22 18:28:33 +00001095
showard170873e2009-01-07 00:22:26 +00001096 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001097 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001098 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001099
1100
showard170873e2009-01-07 00:22:26 +00001101 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001102 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001103 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001104 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001105
1106
showard170873e2009-01-07 00:22:26 +00001107 def _read_pidfile(self, use_second_read=False):
1108 assert self.pidfile_id is not None, (
1109 'You must call run() or attach_to_existing_process()')
1110 contents = _drone_manager.get_pidfile_contents(
1111 self.pidfile_id, use_second_read=use_second_read)
1112 if contents.is_invalid():
1113 self._state = drone_manager.PidfileContents()
1114 raise self._PidfileException(contents)
1115 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001116
1117
showard21baa452008-10-21 00:08:39 +00001118 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001119 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1120 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001121 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001122 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001123
1124
1125 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001126 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001127 return
mblighbb421852008-03-11 22:36:16 +00001128
showard21baa452008-10-21 00:08:39 +00001129 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001130
showard170873e2009-01-07 00:22:26 +00001131 if self._state.process is None:
1132 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001133 return
mbligh90a549d2008-03-25 23:52:34 +00001134
showard21baa452008-10-21 00:08:39 +00001135 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001136 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001137 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001138 return
mbligh90a549d2008-03-25 23:52:34 +00001139
showard170873e2009-01-07 00:22:26 +00001140 # pid but no running process - maybe process *just* exited
1141 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001142 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001143 # autoserv exited without writing an exit code
1144 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001145 self._handle_pidfile_error(
1146 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001147
showard21baa452008-10-21 00:08:39 +00001148
1149 def _get_pidfile_info(self):
1150 """\
1151 After completion, self._state will contain:
1152 pid=None, exit_status=None if autoserv has not yet run
1153 pid!=None, exit_status=None if autoserv is running
1154 pid!=None, exit_status!=None if autoserv has completed
1155 """
1156 try:
1157 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001158 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001159 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001160
1161
showard170873e2009-01-07 00:22:26 +00001162 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001163 """\
1164 Called when no pidfile is found or no pid is in the pidfile.
1165 """
showard170873e2009-01-07 00:22:26 +00001166 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001167 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001168 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001169 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001170 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001171
1172
showard35162b02009-03-03 02:17:30 +00001173 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001174 """\
1175 Called when autoserv has exited without writing an exit status,
1176 or we've timed out waiting for autoserv to write a pid to the
1177 pidfile. In either case, we just return failure and the caller
1178 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001179
showard170873e2009-01-07 00:22:26 +00001180 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001181 """
1182 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001183 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001184 self._state.exit_status = 1
1185 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001186
1187
jadmanski0afbb632008-06-06 21:10:57 +00001188 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001189 self._get_pidfile_info()
1190 return self._state.exit_status
1191
1192
1193 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001194 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001195 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001196 if self._state.num_tests_failed is None:
1197 return -1
showard21baa452008-10-21 00:08:39 +00001198 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001199
1200
showardcdaeae82009-08-31 18:32:48 +00001201 def try_copy_results_on_drone(self, **kwargs):
1202 if self.has_process():
1203 # copy results logs into the normal place for job results
1204 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1205
1206
1207 def try_copy_to_results_repository(self, source, **kwargs):
1208 if self.has_process():
1209 _drone_manager.copy_to_results_repository(self.get_process(),
1210 source, **kwargs)
1211
1212
mbligh36768f02008-02-22 18:28:33 +00001213class Agent(object):
showard77182562009-06-10 00:16:05 +00001214 """
Alex Miller47715eb2013-07-24 03:34:01 -07001215 An agent for use by the Dispatcher class to perform a task. An agent wraps
1216 around an AgentTask mainly to associate the AgentTask with the queue_entry
1217 and host ids.
showard77182562009-06-10 00:16:05 +00001218
1219 The following methods are required on all task objects:
1220 poll() - Called periodically to let the task check its status and
1221 update its internal state. If the task succeeded.
1222 is_done() - Returns True if the task is finished.
1223 abort() - Called when an abort has been requested. The task must
1224 set its aborted attribute to True if it actually aborted.
1225
1226 The following attributes are required on all task objects:
1227 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001228 success - bool, True if this task succeeded.
1229 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1230 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001231 """
1232
1233
showard418785b2009-11-23 20:19:59 +00001234 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001235 """
Alex Miller47715eb2013-07-24 03:34:01 -07001236 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001237 """
showard8cc058f2009-09-08 16:26:33 +00001238 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001239
showard77182562009-06-10 00:16:05 +00001240 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001241 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001242
showard8cc058f2009-09-08 16:26:33 +00001243 self.queue_entry_ids = task.queue_entry_ids
1244 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001245
showard8cc058f2009-09-08 16:26:33 +00001246 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001247 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001248
1249
jadmanski0afbb632008-06-06 21:10:57 +00001250 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001251 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001252 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001253 self.task.poll()
1254 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001255 self.finished = True
showardec113162008-05-08 00:52:49 +00001256
1257
jadmanski0afbb632008-06-06 21:10:57 +00001258 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001259 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001260
1261
showardd3dc1992009-04-22 21:01:40 +00001262 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001263 if self.task:
1264 self.task.abort()
1265 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001266 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001267 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001268
showardd3dc1992009-04-22 21:01:40 +00001269
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001270class BaseAgentTask(object):
showardd1195652009-12-08 22:21:02 +00001271 class _NullMonitor(object):
1272 pidfile_id = None
1273
1274 def has_process(self):
1275 return True
1276
1277
1278 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001279 """
showardd1195652009-12-08 22:21:02 +00001280 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001281 """
jadmanski0afbb632008-06-06 21:10:57 +00001282 self.done = False
showardd1195652009-12-08 22:21:02 +00001283 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001284 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001285 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001286 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001287 self.queue_entry_ids = []
1288 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001289 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001290
1291
1292 def _set_ids(self, host=None, queue_entries=None):
1293 if queue_entries and queue_entries != [None]:
1294 self.host_ids = [entry.host.id for entry in queue_entries]
1295 self.queue_entry_ids = [entry.id for entry in queue_entries]
1296 else:
1297 assert host
1298 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001299
1300
jadmanski0afbb632008-06-06 21:10:57 +00001301 def poll(self):
showard08a36412009-05-05 01:01:13 +00001302 if not self.started:
1303 self.start()
showardd1195652009-12-08 22:21:02 +00001304 if not self.done:
1305 self.tick()
showard08a36412009-05-05 01:01:13 +00001306
1307
1308 def tick(self):
showardd1195652009-12-08 22:21:02 +00001309 assert self.monitor
1310 exit_code = self.monitor.exit_code()
1311 if exit_code is None:
1312 return
mbligh36768f02008-02-22 18:28:33 +00001313
showardd1195652009-12-08 22:21:02 +00001314 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001315 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001316
1317
jadmanski0afbb632008-06-06 21:10:57 +00001318 def is_done(self):
1319 return self.done
mbligh36768f02008-02-22 18:28:33 +00001320
1321
jadmanski0afbb632008-06-06 21:10:57 +00001322 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001323 if self.done:
showardd1195652009-12-08 22:21:02 +00001324 assert self.started
showard08a36412009-05-05 01:01:13 +00001325 return
showardd1195652009-12-08 22:21:02 +00001326 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001327 self.done = True
1328 self.success = success
1329 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001330
1331
jadmanski0afbb632008-06-06 21:10:57 +00001332 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001333 """
1334 To be overridden.
1335 """
showarded2afea2009-07-07 20:54:07 +00001336 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001337 self.register_necessary_pidfiles()
1338
1339
1340 def _log_file(self):
1341 if not self._log_file_name:
1342 return None
1343 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001344
mbligh36768f02008-02-22 18:28:33 +00001345
jadmanski0afbb632008-06-06 21:10:57 +00001346 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001347 log_file = self._log_file()
1348 if self.monitor and log_file:
1349 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001350
1351
jadmanski0afbb632008-06-06 21:10:57 +00001352 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001353 """
1354 To be overridden.
1355 """
jadmanski0afbb632008-06-06 21:10:57 +00001356 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001357 logging.info("%s finished with success=%s", type(self).__name__,
1358 self.success)
1359
mbligh36768f02008-02-22 18:28:33 +00001360
1361
jadmanski0afbb632008-06-06 21:10:57 +00001362 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001363 if not self.started:
1364 self.prolog()
1365 self.run()
1366
1367 self.started = True
1368
1369
1370 def abort(self):
1371 if self.monitor:
1372 self.monitor.kill()
1373 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001374 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001375 self.cleanup()
1376
1377
showarded2afea2009-07-07 20:54:07 +00001378 def _get_consistent_execution_path(self, execution_entries):
1379 first_execution_path = execution_entries[0].execution_path()
1380 for execution_entry in execution_entries[1:]:
1381 assert execution_entry.execution_path() == first_execution_path, (
1382 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1383 execution_entry,
1384 first_execution_path,
1385 execution_entries[0]))
1386 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001387
1388
showarded2afea2009-07-07 20:54:07 +00001389 def _copy_results(self, execution_entries, use_monitor=None):
1390 """
1391 @param execution_entries: list of objects with execution_path() method
1392 """
showard6d1c1432009-08-20 23:30:39 +00001393 if use_monitor is not None and not use_monitor.has_process():
1394 return
1395
showarded2afea2009-07-07 20:54:07 +00001396 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001397 if use_monitor is None:
1398 assert self.monitor
1399 use_monitor = self.monitor
1400 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001401 execution_path = self._get_consistent_execution_path(execution_entries)
1402 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001403 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001404
showarda1e74b32009-05-12 17:32:04 +00001405
1406 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001407 for queue_entry in queue_entries:
1408 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001409
1410
mbligh4608b002010-01-05 18:22:35 +00001411 def _archive_results(self, queue_entries):
1412 for queue_entry in queue_entries:
1413 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001414
1415
showardd1195652009-12-08 22:21:02 +00001416 def _command_line(self):
1417 """
1418 Return the command line to run. Must be overridden.
1419 """
1420 raise NotImplementedError
1421
1422
1423 @property
1424 def num_processes(self):
1425 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001426 Return the number of processes forked by this BaseAgentTask's process.
1427 It may only be approximate. To be overridden if necessary.
showardd1195652009-12-08 22:21:02 +00001428 """
1429 return 1
1430
1431
1432 def _paired_with_monitor(self):
1433 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001434 If this BaseAgentTask's process must run on the same machine as some
showardd1195652009-12-08 22:21:02 +00001435 previous process, this method should be overridden to return a
1436 PidfileRunMonitor for that process.
1437 """
1438 return self._NullMonitor()
1439
1440
1441 @property
1442 def owner_username(self):
1443 """
1444 Return login of user responsible for this task. May be None. Must be
1445 overridden.
1446 """
1447 raise NotImplementedError
1448
1449
1450 def _working_directory(self):
1451 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001452 Return the directory where this BaseAgentTask's process executes.
1453 Must be overridden.
showardd1195652009-12-08 22:21:02 +00001454 """
1455 raise NotImplementedError
1456
1457
1458 def _pidfile_name(self):
1459 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001460 Return the name of the pidfile this BaseAgentTask's process uses. To be
showardd1195652009-12-08 22:21:02 +00001461 overridden if necessary.
1462 """
jamesrenc44ae992010-02-19 00:12:54 +00001463 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001464
1465
1466 def _check_paired_results_exist(self):
1467 if not self._paired_with_monitor().has_process():
1468 email_manager.manager.enqueue_notify_email(
1469 'No paired results in task',
1470 'No paired results in task %s at %s'
1471 % (self, self._paired_with_monitor().pidfile_id))
1472 self.finished(False)
1473 return False
1474 return True
1475
1476
1477 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001478 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001479 self.monitor = PidfileRunMonitor()
1480
1481
1482 def run(self):
1483 if not self._check_paired_results_exist():
1484 return
1485
1486 self._create_monitor()
1487 self.monitor.run(
1488 self._command_line(), self._working_directory(),
1489 num_processes=self.num_processes,
1490 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1491 pidfile_name=self._pidfile_name(),
1492 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001493 username=self.owner_username,
1494 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1495
1496
1497 def get_drone_hostnames_allowed(self):
1498 if not models.DroneSet.drone_sets_enabled():
1499 return None
1500
1501 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1502 if not hqes:
1503 # Only special tasks could be missing host queue entries
1504 assert isinstance(self, SpecialAgentTask)
1505 return self._user_or_global_default_drone_set(
1506 self.task, self.task.requested_by)
1507
1508 job_ids = hqes.values_list('job', flat=True).distinct()
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001509 assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
jamesren76fcf192010-04-21 20:39:50 +00001510 "span multiple jobs")
1511
1512 job = models.Job.objects.get(id=job_ids[0])
1513 drone_set = job.drone_set
1514 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001515 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001516
1517 return drone_set.get_drone_hostnames()
1518
1519
1520 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1521 """
1522 Returns the user's default drone set, if present.
1523
1524 Otherwise, returns the global default drone set.
1525 """
1526 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1527 if not user:
1528 logging.warn('%s had no owner; using default drone set',
1529 obj_with_owner)
1530 return default_hostnames
1531 if not user.drone_set:
1532 logging.warn('User %s has no default drone set, using global '
1533 'default', user.login)
1534 return default_hostnames
1535 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001536
1537
1538 def register_necessary_pidfiles(self):
1539 pidfile_id = _drone_manager.get_pidfile_id_from(
1540 self._working_directory(), self._pidfile_name())
1541 _drone_manager.register_pidfile(pidfile_id)
1542
1543 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1544 if paired_pidfile_id:
1545 _drone_manager.register_pidfile(paired_pidfile_id)
1546
1547
1548 def recover(self):
1549 if not self._check_paired_results_exist():
1550 return
1551
1552 self._create_monitor()
1553 self.monitor.attach_to_existing_process(
1554 self._working_directory(), pidfile_name=self._pidfile_name(),
1555 num_processes=self.num_processes)
1556 if not self.monitor.has_process():
1557 # no process to recover; wait to be started normally
1558 self.monitor = None
1559 return
1560
1561 self.started = True
Aviv Keshet225bdfe2013-03-05 10:10:08 -08001562 logging.info('Recovering process %s for %s at %s',
1563 self.monitor.get_process(), type(self).__name__,
1564 self._working_directory())
mbligh36768f02008-02-22 18:28:33 +00001565
1566
mbligh4608b002010-01-05 18:22:35 +00001567 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1568 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001569 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001570 for entry in queue_entries:
1571 if entry.status not in allowed_hqe_statuses:
Dale Curtisaa513362011-03-01 17:27:44 -08001572 raise host_scheduler.SchedulerError(
1573 '%s attempting to start entry with invalid status %s: '
1574 '%s' % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001575 invalid_host_status = (
1576 allowed_host_statuses is not None
1577 and entry.host.status not in allowed_host_statuses)
1578 if invalid_host_status:
Dale Curtisaa513362011-03-01 17:27:44 -08001579 raise host_scheduler.SchedulerError(
1580 '%s attempting to start on queue entry with invalid '
1581 'host status %s: %s'
1582 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001583
1584
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001585SiteAgentTask = utils.import_site_class(
1586 __file__, 'autotest_lib.scheduler.site_monitor_db',
1587 'SiteAgentTask', BaseAgentTask)
1588
1589class AgentTask(SiteAgentTask):
1590 pass
1591
1592
showardd9205182009-04-27 20:09:55 +00001593class TaskWithJobKeyvals(object):
1594 """AgentTask mixin providing functionality to help with job keyval files."""
1595 _KEYVAL_FILE = 'keyval'
1596 def _format_keyval(self, key, value):
1597 return '%s=%s' % (key, value)
1598
1599
1600 def _keyval_path(self):
1601 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001602 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001603
1604
1605 def _write_keyval_after_job(self, field, value):
1606 assert self.monitor
1607 if not self.monitor.has_process():
1608 return
1609 _drone_manager.write_lines_to_file(
1610 self._keyval_path(), [self._format_keyval(field, value)],
1611 paired_with_process=self.monitor.get_process())
1612
1613
1614 def _job_queued_keyval(self, job):
1615 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1616
1617
1618 def _write_job_finished(self):
1619 self._write_keyval_after_job("job_finished", int(time.time()))
1620
1621
showarddb502762009-09-09 15:31:20 +00001622 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1623 keyval_contents = '\n'.join(self._format_keyval(key, value)
1624 for key, value in keyval_dict.iteritems())
1625 # always end with a newline to allow additional keyvals to be written
1626 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001627 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001628 keyval_contents,
1629 file_path=keyval_path)
1630
1631
1632 def _write_keyvals_before_job(self, keyval_dict):
1633 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1634
1635
1636 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001637 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001638 host.hostname)
1639 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001640 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001641 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1642 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1643
1644
Alex Millerc23a7f02013-08-27 17:36:42 -07001645class SelfThrottledTask(AgentTask):
1646 """
1647 Special AgentTask subclass that maintains its own global process limit.
1648 """
1649 _num_running_processes = 0
1650
1651
1652 @classmethod
1653 def _increment_running_processes(cls):
1654 cls._num_running_processes += 1
1655
1656
1657 @classmethod
1658 def _decrement_running_processes(cls):
1659 cls._num_running_processes -= 1
1660
1661
1662 @classmethod
1663 def _max_processes(cls):
1664 raise NotImplementedError
1665
1666
1667 @classmethod
1668 def _can_run_new_process(cls):
1669 return cls._num_running_processes < cls._max_processes()
1670
1671
1672 def _process_started(self):
1673 return bool(self.monitor)
1674
1675
1676 def tick(self):
1677 # override tick to keep trying to start until the process count goes
1678 # down and we can, at which point we revert to default behavior
1679 if self._process_started():
1680 super(SelfThrottledTask, self).tick()
1681 else:
1682 self._try_starting_process()
1683
1684
1685 def run(self):
1686 # override run() to not actually run unless we can
1687 self._try_starting_process()
1688
1689
1690 def _try_starting_process(self):
1691 if not self._can_run_new_process():
1692 return
1693
1694 # actually run the command
1695 super(SelfThrottledTask, self).run()
1696 if self._process_started():
1697 self._increment_running_processes()
1698
1699
1700 def finished(self, success):
1701 super(SelfThrottledTask, self).finished(success)
1702 if self._process_started():
1703 self._decrement_running_processes()
1704
1705
1706
showard8cc058f2009-09-08 16:26:33 +00001707class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001708 """
1709 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1710 """
1711
1712 TASK_TYPE = None
1713 host = None
1714 queue_entry = None
1715
showardd1195652009-12-08 22:21:02 +00001716 def __init__(self, task, extra_command_args):
1717 super(SpecialAgentTask, self).__init__()
1718
lmrb7c5d272010-04-16 06:34:04 +00001719 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001720
jamesrenc44ae992010-02-19 00:12:54 +00001721 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001722 self.queue_entry = None
1723 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001724 self.queue_entry = scheduler_models.HostQueueEntry(
1725 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001726
showarded2afea2009-07-07 20:54:07 +00001727 self.task = task
1728 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001729
1730
showard8cc058f2009-09-08 16:26:33 +00001731 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001732 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1733
1734
1735 def _command_line(self):
1736 return _autoserv_command_line(self.host.hostname,
1737 self._extra_command_args,
1738 queue_entry=self.queue_entry)
1739
1740
1741 def _working_directory(self):
1742 return self.task.execution_path()
1743
1744
1745 @property
1746 def owner_username(self):
1747 if self.task.requested_by:
1748 return self.task.requested_by.login
1749 return None
showard8cc058f2009-09-08 16:26:33 +00001750
1751
showarded2afea2009-07-07 20:54:07 +00001752 def prolog(self):
1753 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001754 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001755 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001756
1757
showardde634ee2009-01-30 01:44:24 +00001758 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001759 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001760
showard2fe3f1d2009-07-06 20:19:11 +00001761 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001762 return # don't fail metahost entries, they'll be reassigned
1763
showard2fe3f1d2009-07-06 20:19:11 +00001764 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001765 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001766 return # entry has been aborted
1767
Alex Millerdfff2fd2013-05-28 13:05:06 -07001768 self._actually_fail_queue_entry()
1769
1770
1771 # TODO(milleral): http://crbug.com/268607
1772 # All this used to be a part of _fail_queue_entry. The
1773 # exact semantics of when one should and should not be failing a queue
1774 # entry need to be worked out, because provisioning has placed us in a
1775 # case where we want to fail a queue entry that could be requeued,
1776 # which makes us fail the two above if statements, and thus
1777 # _fail_queue_entry() would exit early and have no effect.
1778 # What's left here with _actually_fail_queue_entry is a hack to be able to
1779 # bypass the checks and unconditionally execute the code.
1780 def _actually_fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001781 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001782 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001783 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001784 self._write_keyval_after_job(queued_key, queued_time)
1785 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001786
showard8cc058f2009-09-08 16:26:33 +00001787 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001788 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001789 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001790 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001791
showard8cc058f2009-09-08 16:26:33 +00001792 pidfile_id = _drone_manager.get_pidfile_id_from(
1793 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001794 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001795 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001796
1797 if self.queue_entry.job.parse_failed_repair:
1798 self._parse_results([self.queue_entry])
1799 else:
1800 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001801
Alex Miller23676a22013-07-03 09:03:36 -07001802 # Also fail all other special tasks that have not yet run for this HQE
1803 pending_tasks = models.SpecialTask.objects.filter(
1804 queue_entry__id=self.queue_entry.id,
1805 is_complete=0)
Alex Miller5e36ccc2013-08-03 16:31:58 -07001806 for task in pending_tasks:
1807 task.finish(False)
Alex Miller23676a22013-07-03 09:03:36 -07001808
showard8cc058f2009-09-08 16:26:33 +00001809
1810 def cleanup(self):
1811 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001812
1813 # We will consider an aborted task to be "Failed"
1814 self.task.finish(bool(self.success))
1815
showardf85a0b72009-10-07 20:48:45 +00001816 if self.monitor:
1817 if self.monitor.has_process():
1818 self._copy_results([self.task])
1819 if self.monitor.pidfile_id is not None:
1820 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001821
1822
Dan Shi07e09af2013-04-12 09:31:29 -07001823 def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
1824 """Remove a type of special task in all tasks, keep last one if needed.
1825
1826 @param special_task_to_remove: type of special task to be removed, e.g.,
1827 models.SpecialTask.Task.VERIFY.
1828 @param keep_last_one: True to keep the last special task if its type is
1829 the same as of special_task_to_remove.
1830
1831 """
1832 queued_special_tasks = models.SpecialTask.objects.filter(
1833 host__id=self.host.id,
1834 task=special_task_to_remove,
1835 is_active=False, is_complete=False, queue_entry=None)
1836 if keep_last_one:
1837 queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
1838 queued_special_tasks.delete()
1839
1840
showard8cc058f2009-09-08 16:26:33 +00001841class RepairTask(SpecialAgentTask):
1842 TASK_TYPE = models.SpecialTask.Task.REPAIR
1843
1844
showardd1195652009-12-08 22:21:02 +00001845 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001846 """\
1847 queue_entry: queue entry to mark failed if this repair fails.
1848 """
1849 protection = host_protections.Protection.get_string(
1850 task.host.protection)
1851 # normalize the protection name
1852 protection = host_protections.Protection.get_attr_name(protection)
1853
1854 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001855 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001856
1857 # *don't* include the queue entry in IDs -- if the queue entry is
1858 # aborted, we want to leave the repair task running
1859 self._set_ids(host=self.host)
1860
1861
1862 def prolog(self):
1863 super(RepairTask, self).prolog()
1864 logging.info("repair_task starting")
1865 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001866
1867
jadmanski0afbb632008-06-06 21:10:57 +00001868 def epilog(self):
1869 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001870
jadmanski0afbb632008-06-06 21:10:57 +00001871 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001872 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001873 else:
showard8cc058f2009-09-08 16:26:33 +00001874 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001875 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001876 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001877
1878
showarded2afea2009-07-07 20:54:07 +00001879class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001880 def _copy_to_results_repository(self):
1881 if not self.queue_entry or self.queue_entry.meta_host:
1882 return
1883
1884 self.queue_entry.set_execution_subdir()
1885 log_name = os.path.basename(self.task.execution_path())
1886 source = os.path.join(self.task.execution_path(), 'debug',
1887 'autoserv.DEBUG')
1888 destination = os.path.join(
1889 self.queue_entry.execution_path(), log_name)
1890
1891 self.monitor.try_copy_to_results_repository(
1892 source, destination_path=destination)
1893
1894
showard170873e2009-01-07 00:22:26 +00001895 def epilog(self):
1896 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001897
showard775300b2009-09-09 15:30:50 +00001898 if self.success:
1899 return
showard8fe93b52008-11-18 17:53:22 +00001900
showard775300b2009-09-09 15:30:50 +00001901 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001902
showard775300b2009-09-09 15:30:50 +00001903 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001904 # effectively ignore failure for these hosts
1905 self.success = True
showard775300b2009-09-09 15:30:50 +00001906 return
1907
1908 if self.queue_entry:
1909 self.queue_entry.requeue()
Alex Millerf3f19452013-07-29 15:53:00 -07001910 # If we requeue a HQE, we should cancel any remaining pre-job
1911 # tasks against this host, otherwise we'll be left in a state
1912 # where a queued HQE has special tasks to run against a host.
1913 models.SpecialTask.objects.filter(
1914 queue_entry__id=self.queue_entry.id,
1915 host__id=self.host.id,
1916 is_complete=0).update(is_complete=1, success=0)
showard775300b2009-09-09 15:30:50 +00001917
1918 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001919 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001920 queue_entry__id=self.queue_entry.id):
1921 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1922 self._fail_queue_entry()
1923 return
1924
showard9bb960b2009-11-19 01:02:11 +00001925 queue_entry = models.HostQueueEntry.objects.get(
1926 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001927 else:
1928 queue_entry = None
1929
1930 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001931 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001932 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001933 queue_entry=queue_entry,
1934 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001935
showard8fe93b52008-11-18 17:53:22 +00001936
Alex Miller42437f92013-05-28 12:58:54 -07001937 def _should_pending(self):
1938 """
1939 Decide if we should call the host queue entry's on_pending method.
1940 We should if:
1941 1) There exists an associated host queue entry.
1942 2) The current special task completed successfully.
1943 3) There do not exist any more special tasks to be run before the
1944 host queue entry starts.
1945
1946 @returns: True if we should call pending, false if not.
1947
1948 """
1949 if not self.queue_entry or not self.success:
1950 return False
1951
1952 # We know if this is the last one when we create it, so we could add
1953 # another column to the database to keep track of this information, but
1954 # I expect the overhead of querying here to be minimal.
1955 queue_entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1956 queued = models.SpecialTask.objects.filter(
1957 host__id=self.host.id, is_active=False,
1958 is_complete=False, queue_entry=queue_entry)
1959 queued = queued.exclude(id=self.task.id)
1960 return queued.count() == 0
1961
1962
showard8fe93b52008-11-18 17:53:22 +00001963class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001964 TASK_TYPE = models.SpecialTask.Task.VERIFY
1965
1966
showardd1195652009-12-08 22:21:02 +00001967 def __init__(self, task):
1968 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001969 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001970
1971
jadmanski0afbb632008-06-06 21:10:57 +00001972 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001973 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001974
showardb18134f2009-03-20 20:52:18 +00001975 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001976 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001977 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1978 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001979
jamesren42318f72010-05-10 23:40:59 +00001980 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00001981 # and there's no need to keep records of other requests.
Dan Shi07e09af2013-04-12 09:31:29 -07001982 self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
1983 keep_last_one=True)
showard2fe3f1d2009-07-06 20:19:11 +00001984
mbligh36768f02008-02-22 18:28:33 +00001985
jadmanski0afbb632008-06-06 21:10:57 +00001986 def epilog(self):
1987 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001988 if self.success:
Alex Miller42437f92013-05-28 12:58:54 -07001989 if self._should_pending():
showard8cc058f2009-09-08 16:26:33 +00001990 self.queue_entry.on_pending()
1991 else:
1992 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001993
1994
mbligh4608b002010-01-05 18:22:35 +00001995class CleanupTask(PreJobTask):
1996 # note this can also run post-job, but when it does, it's running standalone
1997 # against the host (not related to the job), so it's not considered a
1998 # PostJobTask
1999
2000 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2001
2002
2003 def __init__(self, task, recover_run_monitor=None):
2004 super(CleanupTask, self).__init__(task, ['--cleanup'])
2005 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2006
2007
2008 def prolog(self):
2009 super(CleanupTask, self).prolog()
2010 logging.info("starting cleanup task for host: %s", self.host.hostname)
2011 self.host.set_status(models.Host.Status.CLEANING)
2012 if self.queue_entry:
Dan Shi07e09af2013-04-12 09:31:29 -07002013 self.queue_entry.set_status(models.HostQueueEntry.Status.CLEANING)
mbligh4608b002010-01-05 18:22:35 +00002014
2015
2016 def _finish_epilog(self):
2017 if not self.queue_entry or not self.success:
2018 return
2019
2020 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2021 should_run_verify = (
2022 self.queue_entry.job.run_verify
2023 and self.host.protection != do_not_verify_protection)
2024 if should_run_verify:
2025 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
2026 models.SpecialTask.objects.create(
2027 host=models.Host.objects.get(id=self.host.id),
2028 queue_entry=entry,
2029 task=models.SpecialTask.Task.VERIFY)
2030 else:
Alex Miller42437f92013-05-28 12:58:54 -07002031 if self._should_pending():
2032 self.queue_entry.on_pending()
mbligh4608b002010-01-05 18:22:35 +00002033
2034
2035 def epilog(self):
2036 super(CleanupTask, self).epilog()
2037
2038 if self.success:
2039 self.host.update_field('dirty', 0)
2040 self.host.set_status(models.Host.Status.READY)
2041
2042 self._finish_epilog()
2043
2044
Dan Shi07e09af2013-04-12 09:31:29 -07002045class ResetTask(PreJobTask):
2046 """Task to reset a DUT, including cleanup and verify."""
2047 # note this can also run post-job, but when it does, it's running standalone
2048 # against the host (not related to the job), so it's not considered a
2049 # PostJobTask
2050
2051 TASK_TYPE = models.SpecialTask.Task.RESET
2052
2053
2054 def __init__(self, task, recover_run_monitor=None):
2055 super(ResetTask, self).__init__(task, ['--reset'])
2056 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2057
2058
2059 def prolog(self):
2060 super(ResetTask, self).prolog()
2061 logging.info('starting reset task for host: %s',
2062 self.host.hostname)
2063 self.host.set_status(models.Host.Status.RESETTING)
2064 if self.queue_entry:
2065 self.queue_entry.set_status(models.HostQueueEntry.Status.RESETTING)
2066
2067 # Delete any queued cleanups for this host.
2068 self.remove_special_tasks(models.SpecialTask.Task.CLEANUP,
2069 keep_last_one=False)
2070
2071 # Delete any queued reverifies for this host.
2072 self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
2073 keep_last_one=False)
2074
2075 # Only one reset is needed.
2076 self.remove_special_tasks(models.SpecialTask.Task.RESET,
2077 keep_last_one=True)
2078
2079
2080 def epilog(self):
2081 super(ResetTask, self).epilog()
2082
2083 if self.success:
2084 self.host.update_field('dirty', 0)
Dan Shi07e09af2013-04-12 09:31:29 -07002085
Alex Millerba076c52013-07-11 10:11:48 -07002086 if self._should_pending():
Dan Shi07e09af2013-04-12 09:31:29 -07002087 self.queue_entry.on_pending()
Alex Millerdc608d52013-07-30 14:26:21 -07002088 else:
2089 self.host.set_status(models.Host.Status.READY)
Dan Shi07e09af2013-04-12 09:31:29 -07002090
2091
Alex Millerdfff2fd2013-05-28 13:05:06 -07002092class ProvisionTask(PreJobTask):
2093 TASK_TYPE = models.SpecialTask.Task.PROVISION
2094
2095 def __init__(self, task):
2096 # Provisioning requires that we be associated with a job/queue entry
2097 assert task.queue_entry, "No HQE associated with provision task!"
2098 # task.queue_entry is an afe model HostQueueEntry object.
2099 # self.queue_entry is a scheduler models HostQueueEntry object, but
2100 # it gets constructed and assigned in __init__, so it's not available
2101 # yet. Therefore, we're stuck pulling labels off of the afe model
2102 # so that we can pass the --provision args into the __init__ call.
2103 labels = {x.name for x in task.queue_entry.job.dependency_labels.all()}
2104 _, provisionable = provision.filter_labels(labels)
2105 extra_command_args = ['--provision', ','.join(provisionable)]
2106 super(ProvisionTask, self).__init__(task, extra_command_args)
2107 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2108
2109
2110 def _command_line(self):
2111 # If we give queue_entry to _autoserv_command_line, then it will append
2112 # -c for this invocation if the queue_entry is a client side test. We
2113 # don't want that, as it messes with provisioning, so we just drop it
2114 # from the arguments here.
2115 # Note that we also don't verify job_repo_url as provisioining tasks are
2116 # required to stage whatever content we need, and the job itself will
2117 # force autotest to be staged if it isn't already.
2118 return _autoserv_command_line(self.host.hostname,
2119 self._extra_command_args)
2120
2121
2122 def prolog(self):
2123 super(ProvisionTask, self).prolog()
2124 # add check for previous provision task and abort if exist.
2125 logging.info("starting provision task for host: %s", self.host.hostname)
2126 self.queue_entry.set_status(
2127 models.HostQueueEntry.Status.PROVISIONING)
2128 self.host.set_status(models.Host.Status.PROVISIONING)
2129
2130
2131 def epilog(self):
2132 # TODO(milleral) Here, we override the PreJobTask's epilog, because
2133 # it's written with the idea that pre-job special task failures are a
2134 # problem with the host and not with something about the HQE.
2135 # In our case, the HQE's DEPENDENCIES specify what the provision task
2136 # does, so if the provision fails, it can be the fault of the HQE, and
2137 # thus we fail the HQE. This difference is handled only here for now,
2138 # but some refactoring of PreJobTask should likely happen sometime in
2139 # the future?
Alex Millere8949c92013-08-07 17:10:15 -07002140 # This call is needed to log the status and call into self.cleanup(),
2141 # which is PreJobTasks's cleanup, which marks is_complete=1.
2142 AgentTask.epilog(self)
Alex Millerdfff2fd2013-05-28 13:05:06 -07002143
2144 if not self.success:
2145 # TODO(milleral) http://crbug.com/231452
2146 # In our own setup, we don't really use the results
2147 # repository, so I *think* this call can be elided. However, I'd
2148 # like to limit what I can possibly break for now, and it would be
2149 # called if I called PreJobTask's epilog, so I'm keeping the call
2150 # to it for now.
2151 self._copy_to_results_repository()
2152 # _actually_fail_queue_entry() is a hack around the fact that we do
2153 # indeed want to abort the queue entry here, but the rest of the
2154 # scheduler code expects that we will reschedule onto some other
2155 # host.
2156 self._actually_fail_queue_entry()
2157 # This abort will mark the aborted bit on the HQE itself, to
2158 # signify that we're killing it. Technically it also will do
2159 # the recursive aborting of all child jobs, but that shouldn't
2160 # matter here, as only suites have children, and those
2161 # are hostless and thus don't have provisioning.
Alex Millerf314b262013-08-07 22:54:10 -07002162 # TODO(milleral) http://crbug.com/188217
2163 # However, we can't actually do this yet, as if we set the abort bit
2164 # the FinalReparseTask will set the status of the HQE to ABORTED,
2165 # which then means that we don't show the status in run_suite.
2166 # So in the meantime, don't mark the HQE as aborted.
Alex Millerdfff2fd2013-05-28 13:05:06 -07002167 queue_entry = models.HostQueueEntry.objects.get(
2168 id=self.queue_entry.id)
Alex Millerf314b262013-08-07 22:54:10 -07002169 # queue_entry.abort()
2170
Alex Millerdfff2fd2013-05-28 13:05:06 -07002171 # The machine is in some totally unknown state, so let's kick off
2172 # a repair task to get it back to some known sane state.
2173 models.SpecialTask.objects.create(
2174 host=models.Host.objects.get(id=self.host.id),
2175 task=models.SpecialTask.Task.REPAIR,
2176 queue_entry=queue_entry,
2177 requested_by=self.task.requested_by)
2178 elif self._should_pending():
2179 self.queue_entry.on_pending()
2180 else:
2181 self.host.set_status(models.Host.Status.READY)
2182
2183
showarda9545c02009-12-18 22:44:26 +00002184class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
2185 """
2186 Common functionality for QueueTask and HostlessQueueTask
2187 """
2188 def __init__(self, queue_entries):
2189 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00002190 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00002191 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00002192
2193
showard73ec0442009-02-07 02:05:20 +00002194 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00002195 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00002196
2197
jamesrenc44ae992010-02-19 00:12:54 +00002198 def _write_control_file(self, execution_path):
2199 control_path = _drone_manager.attach_file_to_execution(
2200 execution_path, self.job.control_file)
2201 return control_path
2202
2203
Aviv Keshet308e7362013-05-21 14:43:16 -07002204 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00002205 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00002206 execution_path = self.queue_entries[0].execution_path()
2207 control_path = self._write_control_file(execution_path)
2208 hostnames = ','.join(entry.host.hostname
2209 for entry in self.queue_entries
2210 if not entry.is_hostless())
2211
2212 execution_tag = self.queue_entries[0].execution_tag()
2213 params = _autoserv_command_line(
2214 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07002215 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00002216 _drone_manager.absolute_path(control_path)],
2217 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07002218 if self.job.is_image_update_job():
2219 params += ['--image', self.job.update_image_path]
2220
jamesrenc44ae992010-02-19 00:12:54 +00002221 return params
showardd1195652009-12-08 22:21:02 +00002222
2223
2224 @property
2225 def num_processes(self):
2226 return len(self.queue_entries)
2227
2228
2229 @property
2230 def owner_username(self):
2231 return self.job.owner
2232
2233
2234 def _working_directory(self):
2235 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002236
2237
jadmanski0afbb632008-06-06 21:10:57 +00002238 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002239 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00002240 keyval_dict = self.job.keyval_dict()
2241 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00002242 group_name = self.queue_entries[0].get_group_name()
2243 if group_name:
2244 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002245 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002246 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002247 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002248 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002249
2250
showard35162b02009-03-03 02:17:30 +00002251 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002252 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002253 _drone_manager.write_lines_to_file(error_file_path,
2254 [_LOST_PROCESS_ERROR])
2255
2256
showardd3dc1992009-04-22 21:01:40 +00002257 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002258 if not self.monitor:
2259 return
2260
showardd9205182009-04-27 20:09:55 +00002261 self._write_job_finished()
2262
showard35162b02009-03-03 02:17:30 +00002263 if self.monitor.lost_process:
2264 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002265
jadmanskif7fa2cc2008-10-01 14:13:23 +00002266
showardcbd74612008-11-19 21:42:02 +00002267 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002268 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002269 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002270 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002271 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002272
2273
jadmanskif7fa2cc2008-10-01 14:13:23 +00002274 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002275 if not self.monitor or not self.monitor.has_process():
2276 return
2277
jadmanskif7fa2cc2008-10-01 14:13:23 +00002278 # build up sets of all the aborted_by and aborted_on values
2279 aborted_by, aborted_on = set(), set()
2280 for queue_entry in self.queue_entries:
2281 if queue_entry.aborted_by:
2282 aborted_by.add(queue_entry.aborted_by)
2283 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2284 aborted_on.add(t)
2285
2286 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00002287 # TODO(showard): this conditional is now obsolete, we just need to leave
2288 # it in temporarily for backwards compatibility over upgrades. delete
2289 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00002290 assert len(aborted_by) <= 1
2291 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002292 aborted_by_value = aborted_by.pop()
2293 aborted_on_value = max(aborted_on)
2294 else:
2295 aborted_by_value = 'autotest_system'
2296 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002297
showarda0382352009-02-11 23:36:43 +00002298 self._write_keyval_after_job("aborted_by", aborted_by_value)
2299 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002300
showardcbd74612008-11-19 21:42:02 +00002301 aborted_on_string = str(datetime.datetime.fromtimestamp(
2302 aborted_on_value))
2303 self._write_status_comment('Job aborted by %s on %s' %
2304 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002305
2306
jadmanski0afbb632008-06-06 21:10:57 +00002307 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002308 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002309 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002310 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002311
2312
jadmanski0afbb632008-06-06 21:10:57 +00002313 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002314 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002315 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002316
2317
2318class QueueTask(AbstractQueueTask):
2319 def __init__(self, queue_entries):
2320 super(QueueTask, self).__init__(queue_entries)
2321 self._set_ids(queue_entries=queue_entries)
2322
2323
2324 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002325 self._check_queue_entry_statuses(
2326 self.queue_entries,
2327 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2328 models.HostQueueEntry.Status.RUNNING),
2329 allowed_host_statuses=(models.Host.Status.PENDING,
2330 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002331
2332 super(QueueTask, self).prolog()
2333
2334 for queue_entry in self.queue_entries:
2335 self._write_host_keyvals(queue_entry.host)
2336 queue_entry.host.set_status(models.Host.Status.RUNNING)
2337 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00002338
2339
2340 def _finish_task(self):
2341 super(QueueTask, self)._finish_task()
2342
2343 for queue_entry in self.queue_entries:
2344 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00002345 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00002346
2347
Alex Miller9f01d5d2013-08-08 02:26:01 -07002348 def _command_line(self):
2349 invocation = super(QueueTask, self)._command_line()
2350 return invocation + ['--verify_job_repo_url']
2351
2352
Alex Millerc23a7f02013-08-27 17:36:42 -07002353class HostlessQueueTask(SelfThrottledTask, AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00002354 def __init__(self, queue_entry):
2355 super(HostlessQueueTask, self).__init__([queue_entry])
2356 self.queue_entry_ids = [queue_entry.id]
2357
2358
2359 def prolog(self):
2360 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2361 super(HostlessQueueTask, self).prolog()
2362
2363
mbligh4608b002010-01-05 18:22:35 +00002364 def _finish_task(self):
2365 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002366 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002367
2368
Alex Millerc23a7f02013-08-27 17:36:42 -07002369 @classmethod
2370 def _max_processes(cls):
2371 return scheduler_config.config.max_hostless_processes
2372
2373
showardd3dc1992009-04-22 21:01:40 +00002374class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002375 def __init__(self, queue_entries, log_file_name):
2376 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002377
showardd1195652009-12-08 22:21:02 +00002378 self.queue_entries = queue_entries
2379
showardd3dc1992009-04-22 21:01:40 +00002380 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002381 self._autoserv_monitor.attach_to_existing_process(
2382 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002383
showardd1195652009-12-08 22:21:02 +00002384
2385 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002386 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002387 return 'true'
2388 return self._generate_command(
2389 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002390
2391
2392 def _generate_command(self, results_dir):
2393 raise NotImplementedError('Subclasses must override this')
2394
2395
showardd1195652009-12-08 22:21:02 +00002396 @property
2397 def owner_username(self):
2398 return self.queue_entries[0].job.owner
2399
2400
2401 def _working_directory(self):
2402 return self._get_consistent_execution_path(self.queue_entries)
2403
2404
2405 def _paired_with_monitor(self):
2406 return self._autoserv_monitor
2407
2408
showardd3dc1992009-04-22 21:01:40 +00002409 def _job_was_aborted(self):
2410 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002411 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002412 queue_entry.update_from_database()
2413 if was_aborted is None: # first queue entry
2414 was_aborted = bool(queue_entry.aborted)
2415 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00002416 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
2417 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00002418 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00002419 'Inconsistent abort state',
2420 'Queue entries have inconsistent abort state:\n' +
2421 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00002422 # don't crash here, just assume true
2423 return True
2424 return was_aborted
2425
2426
showardd1195652009-12-08 22:21:02 +00002427 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002428 if self._job_was_aborted():
2429 return models.HostQueueEntry.Status.ABORTED
2430
2431 # we'll use a PidfileRunMonitor to read the autoserv exit status
2432 if self._autoserv_monitor.exit_code() == 0:
2433 return models.HostQueueEntry.Status.COMPLETED
2434 return models.HostQueueEntry.Status.FAILED
2435
2436
showardd3dc1992009-04-22 21:01:40 +00002437 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002438 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002439 queue_entry.set_status(status)
2440
2441
2442 def abort(self):
2443 # override AgentTask.abort() to avoid killing the process and ending
2444 # the task. post-job tasks continue when the job is aborted.
2445 pass
2446
2447
mbligh4608b002010-01-05 18:22:35 +00002448 def _pidfile_label(self):
2449 # '.autoserv_execute' -> 'autoserv'
2450 return self._pidfile_name()[1:-len('_execute')]
2451
2452
showard9bb960b2009-11-19 01:02:11 +00002453class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002454 """
2455 Task responsible for
2456 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2457 * copying logs to the results repository
2458 * spawning CleanupTasks for hosts, if necessary
2459 * spawning a FinalReparseTask for the job
2460 """
showardd1195652009-12-08 22:21:02 +00002461 def __init__(self, queue_entries, recover_run_monitor=None):
2462 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002463 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002464 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002465 self._set_ids(queue_entries=queue_entries)
2466
2467
Aviv Keshet308e7362013-05-21 14:43:16 -07002468 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd3dc1992009-04-22 21:01:40 +00002469 def _generate_command(self, results_dir):
2470 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002471 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002472 return [_autoserv_path , '-p',
2473 '--pidfile-label=%s' % self._pidfile_label(),
2474 '--use-existing-results', '--collect-crashinfo',
2475 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002476
2477
showardd1195652009-12-08 22:21:02 +00002478 @property
2479 def num_processes(self):
2480 return len(self.queue_entries)
2481
2482
2483 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002484 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002485
2486
showardd3dc1992009-04-22 21:01:40 +00002487 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002488 self._check_queue_entry_statuses(
2489 self.queue_entries,
2490 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2491 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002492
showardd3dc1992009-04-22 21:01:40 +00002493 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002494
2495
showardd3dc1992009-04-22 21:01:40 +00002496 def epilog(self):
2497 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002498 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002499 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002500
showard9bb960b2009-11-19 01:02:11 +00002501
2502 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002503 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002504 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002505 models.HostQueueEntry.Status.COMPLETED)
2506 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2507 else:
2508 final_success = False
2509 num_tests_failed = 0
showard9bb960b2009-11-19 01:02:11 +00002510 reboot_after = self._job.reboot_after
2511 do_reboot = (
2512 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002513 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002514 or reboot_after == model_attributes.RebootAfter.ALWAYS
2515 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
Dan Shi07e09af2013-04-12 09:31:29 -07002516 and final_success and num_tests_failed == 0)
2517 or num_tests_failed > 0)
showard9bb960b2009-11-19 01:02:11 +00002518
showardd1195652009-12-08 22:21:02 +00002519 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002520 if do_reboot:
2521 # don't pass the queue entry to the CleanupTask. if the cleanup
2522 # fails, the job doesn't care -- it's over.
2523 models.SpecialTask.objects.create(
2524 host=models.Host.objects.get(id=queue_entry.host.id),
2525 task=models.SpecialTask.Task.CLEANUP,
2526 requested_by=self._job.owner_model())
2527 else:
2528 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002529
2530
showard0bbfc212009-04-29 21:06:13 +00002531 def run(self):
showard597bfd32009-05-08 18:22:50 +00002532 autoserv_exit_code = self._autoserv_monitor.exit_code()
2533 # only run if Autoserv exited due to some signal. if we have no exit
2534 # code, assume something bad (and signal-like) happened.
2535 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002536 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002537 else:
2538 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002539
2540
Alex Millerc23a7f02013-08-27 17:36:42 -07002541class FinalReparseTask(SelfThrottledTask, PostJobTask):
showardd1195652009-12-08 22:21:02 +00002542 def __init__(self, queue_entries):
2543 super(FinalReparseTask, self).__init__(queue_entries,
2544 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002545 # don't use _set_ids, since we don't want to set the host_ids
2546 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002547
2548
2549 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002550 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002551 results_dir]
2552
2553
2554 @property
2555 def num_processes(self):
2556 return 0 # don't include parser processes in accounting
2557
2558
2559 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002560 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002561
2562
showard97aed502008-11-04 02:01:24 +00002563 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002564 def _max_processes(cls):
2565 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002566
2567
2568 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002569 self._check_queue_entry_statuses(
2570 self.queue_entries,
2571 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002572
showard97aed502008-11-04 02:01:24 +00002573 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002574
2575
2576 def epilog(self):
2577 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002578 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002579
2580
Alex Millerc23a7f02013-08-27 17:36:42 -07002581class ArchiveResultsTask(SelfThrottledTask, PostJobTask):
showarde1575b52010-01-15 00:21:12 +00002582 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2583
mbligh4608b002010-01-05 18:22:35 +00002584 def __init__(self, queue_entries):
2585 super(ArchiveResultsTask, self).__init__(queue_entries,
2586 log_file_name='.archiving.log')
2587 # don't use _set_ids, since we don't want to set the host_ids
2588 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002589
2590
mbligh4608b002010-01-05 18:22:35 +00002591 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002592 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002593
2594
Aviv Keshet308e7362013-05-21 14:43:16 -07002595 # TODO: Refactor into autoserv_utils. crbug.com/243090
mbligh4608b002010-01-05 18:22:35 +00002596 def _generate_command(self, results_dir):
2597 return [_autoserv_path , '-p',
2598 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002599 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002600 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2601 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002602
2603
mbligh4608b002010-01-05 18:22:35 +00002604 @classmethod
2605 def _max_processes(cls):
2606 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002607
2608
2609 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002610 self._check_queue_entry_statuses(
2611 self.queue_entries,
2612 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2613
2614 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002615
2616
mbligh4608b002010-01-05 18:22:35 +00002617 def epilog(self):
2618 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002619 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002620 failed_file = os.path.join(self._working_directory(),
2621 self._ARCHIVING_FAILED_FILE)
2622 paired_process = self._paired_with_monitor().get_process()
2623 _drone_manager.write_lines_to_file(
2624 failed_file, ['Archiving failed with exit code %s'
2625 % self.monitor.exit_code()],
2626 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002627 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002628
2629
mbligh36768f02008-02-22 18:28:33 +00002630if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002631 main()