blob: d6febbf5716c3b1fac1872eb098dfed3e354eb2a [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
mbligh36768f02008-02-22 18:28:33 +00008
Aviv Keshet225bdfe2013-03-05 10:10:08 -08009import datetime, optparse, os, signal
10import sys, time, traceback, urllib
11import logging, gc
showard402934a2009-12-21 22:20:47 +000012
Alex Miller05d7b4c2013-03-04 07:49:38 -080013import common
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000015
16import django.db
17
showard136e6dc2009-06-10 19:38:49 +000018from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000019from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000020from autotest_lib.database import database_connection
jamesrendd855242010-03-02 22:23:44 +000021from autotest_lib.frontend.afe import model_attributes
Alex Miller05d7b4c2013-03-04 07:49:38 -080022from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000023from autotest_lib.scheduler import drone_manager, drones, email_manager
Dale Curtisaa513362011-03-01 17:27:44 -080024from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
Alex Miller05d7b4c2013-03-04 07:49:38 -080025from autotest_lib.scheduler import scheduler_logging_config
jamesrenc44ae992010-02-19 00:12:54 +000026from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080027from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070028from autotest_lib.server import autoserv_utils
Alex Millerdfff2fd2013-05-28 13:05:06 -070029from autotest_lib.server.cros import provision
Fang Deng1d6c2a02013-04-17 15:25:45 -070030from autotest_lib.site_utils.graphite import stats
Alex Miller05d7b4c2013-03-04 07:49:38 -080031
showard549afad2009-08-20 23:33:36 +000032BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
33PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000034
mbligh36768f02008-02-22 18:28:33 +000035RESULTS_DIR = '.'
36AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000037DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000038AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
39
40if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000041 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000042AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
43AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
44
45if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000046 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000047
showard35162b02009-03-03 02:17:30 +000048# error message to leave in results dir when an autoserv process disappears
49# mysteriously
50_LOST_PROCESS_ERROR = """\
51Autoserv failed abnormally during execution for this job, probably due to a
52system error on the Autotest server. Full results may not be available. Sorry.
53"""
54
mbligh6f8bab42008-02-29 22:45:14 +000055_db = None
mbligh36768f02008-02-22 18:28:33 +000056_shutdown = False
Aviv Keshet308e7362013-05-21 14:43:16 -070057_autoserv_directory = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server')
58_autoserv_path = os.path.join(_autoserv_directory, 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000059_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000060_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000061
Eric Lie0493a42010-11-15 13:05:43 -080062def _parser_path_default(install_dir):
63 return os.path.join(install_dir, 'tko', 'parse')
64_parser_path_func = utils.import_site_function(
65 __file__, 'autotest_lib.scheduler.site_monitor_db',
66 'parser_path', _parser_path_default)
67_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
68
mbligh36768f02008-02-22 18:28:33 +000069
showardec6a3b92009-09-25 20:29:13 +000070def _get_pidfile_timeout_secs():
71 """@returns How long to wait for autoserv to write pidfile."""
72 pidfile_timeout_mins = global_config.global_config.get_config_value(
73 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
74 return pidfile_timeout_mins * 60
75
76
mbligh83c1e9e2009-05-01 23:10:41 +000077def _site_init_monitor_db_dummy():
78 return {}
79
80
jamesren76fcf192010-04-21 20:39:50 +000081def _verify_default_drone_set_exists():
82 if (models.DroneSet.drone_sets_enabled() and
83 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080084 raise host_scheduler.SchedulerError(
85 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000086
87
88def _sanity_check():
89 """Make sure the configs are consistent before starting the scheduler"""
90 _verify_default_drone_set_exists()
91
92
mbligh36768f02008-02-22 18:28:33 +000093def main():
showard27f33872009-04-07 18:20:53 +000094 try:
showard549afad2009-08-20 23:33:36 +000095 try:
96 main_without_exception_handling()
97 except SystemExit:
98 raise
99 except:
100 logging.exception('Exception escaping in monitor_db')
101 raise
102 finally:
103 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000104
105
106def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000107 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000108
showard136e6dc2009-06-10 19:38:49 +0000109 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000110 parser = optparse.OptionParser(usage)
111 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
112 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000113 parser.add_option('--test', help='Indicate that scheduler is under ' +
114 'test and should use dummy autoserv and no parsing',
115 action='store_true')
116 (options, args) = parser.parse_args()
117 if len(args) != 1:
118 parser.print_usage()
119 return
mbligh36768f02008-02-22 18:28:33 +0000120
showard5613c662009-06-08 23:30:33 +0000121 scheduler_enabled = global_config.global_config.get_config_value(
122 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
123
124 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800125 logging.error("Scheduler not enabled, set enable_scheduler to true in "
126 "the global_config's SCHEDULER section to enable it. "
127 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000128 sys.exit(1)
129
jadmanski0afbb632008-06-06 21:10:57 +0000130 global RESULTS_DIR
131 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000132
mbligh83c1e9e2009-05-01 23:10:41 +0000133 site_init = utils.import_site_function(__file__,
134 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
135 _site_init_monitor_db_dummy)
136 site_init()
137
showardcca334f2009-03-12 20:38:34 +0000138 # Change the cwd while running to avoid issues incase we were launched from
139 # somewhere odd (such as a random NFS home directory of the person running
140 # sudo to launch us as the appropriate user).
141 os.chdir(RESULTS_DIR)
142
jamesrenc7d387e2010-08-10 21:48:30 +0000143 # This is helpful for debugging why stuff a scheduler launches is
144 # misbehaving.
145 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000146
jadmanski0afbb632008-06-06 21:10:57 +0000147 if options.test:
148 global _autoserv_path
149 _autoserv_path = 'autoserv_dummy'
150 global _testing_mode
151 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000152
jamesrenc44ae992010-02-19 00:12:54 +0000153 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000154 server.start()
155
jadmanski0afbb632008-06-06 21:10:57 +0000156 try:
jamesrenc44ae992010-02-19 00:12:54 +0000157 initialize()
showardc5afc462009-01-13 00:09:39 +0000158 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000159 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000160
Eric Lia82dc352011-02-23 13:15:52 -0800161 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000162 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000163 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000164 except:
showard170873e2009-01-07 00:22:26 +0000165 email_manager.manager.log_stacktrace(
166 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000167
showard170873e2009-01-07 00:22:26 +0000168 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000169 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000170 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000171 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000172
173
showard136e6dc2009-06-10 19:38:49 +0000174def setup_logging():
175 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
176 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
177 logging_manager.configure_logging(
178 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
179 logfile_name=log_name)
180
181
mbligh36768f02008-02-22 18:28:33 +0000182def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000183 global _shutdown
184 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000185 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000186
187
jamesrenc44ae992010-02-19 00:12:54 +0000188def initialize():
showardb18134f2009-03-20 20:52:18 +0000189 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
190 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000191
showard8de37132009-08-31 18:33:08 +0000192 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000193 logging.critical("monitor_db already running, aborting!")
194 sys.exit(1)
195 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000196
showardb1e51872008-10-07 11:08:18 +0000197 if _testing_mode:
198 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000199 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000200
jadmanski0afbb632008-06-06 21:10:57 +0000201 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
202 global _db
showard170873e2009-01-07 00:22:26 +0000203 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000204 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000205
showardfa8629c2008-11-04 16:51:23 +0000206 # ensure Django connection is in autocommit
207 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000208 # bypass the readonly connection
209 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000210
showardb18134f2009-03-20 20:52:18 +0000211 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000212 signal.signal(signal.SIGINT, handle_sigint)
213
jamesrenc44ae992010-02-19 00:12:54 +0000214 initialize_globals()
215 scheduler_models.initialize()
216
showardd1ee1dd2009-01-07 21:33:08 +0000217 drones = global_config.global_config.get_config_value(
218 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
219 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000220 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000221 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000222 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
223
showardb18134f2009-03-20 20:52:18 +0000224 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000225
226
jamesrenc44ae992010-02-19 00:12:54 +0000227def initialize_globals():
228 global _drone_manager
229 _drone_manager = drone_manager.instance()
230
231
showarded2afea2009-07-07 20:54:07 +0000232def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
233 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000234 """
235 @returns The autoserv command line as a list of executable + parameters.
236
237 @param machines - string - A machine or comma separated list of machines
238 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000239 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700240 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
241 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000242 @param queue_entry - A HostQueueEntry object - If supplied and no Job
243 object was supplied, this will be used to lookup the Job object.
244 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700245 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
246 machines, results_directory=drone_manager.WORKING_DIRECTORY,
247 extra_args=extra_args, job=job, queue_entry=queue_entry,
248 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000249
250
Simran Basia858a232012-08-21 11:04:37 -0700251class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800252
253
jadmanski0afbb632008-06-06 21:10:57 +0000254 def __init__(self):
255 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000256 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800257 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000258 user_cleanup_time = scheduler_config.config.clean_interval
259 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
260 _db, user_cleanup_time)
261 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000262 self._host_agents = {}
263 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000264 self._tick_count = 0
265 self._last_garbage_stats_time = time.time()
266 self._seconds_between_garbage_stats = 60 * (
267 global_config.global_config.get_config_value(
268 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700269 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700270 self._tick_debug = global_config.global_config.get_config_value(
271 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
272 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700273 self._extra_debugging = global_config.global_config.get_config_value(
274 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
275 default=False)
mbligh36768f02008-02-22 18:28:33 +0000276
mbligh36768f02008-02-22 18:28:33 +0000277
showard915958d2009-04-22 21:00:58 +0000278 def initialize(self, recover_hosts=True):
279 self._periodic_cleanup.initialize()
280 self._24hr_upkeep.initialize()
281
jadmanski0afbb632008-06-06 21:10:57 +0000282 # always recover processes
283 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000284
jadmanski0afbb632008-06-06 21:10:57 +0000285 if recover_hosts:
286 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000287
jamesrenc44ae992010-02-19 00:12:54 +0000288 self._host_scheduler.recovery_on_startup()
289
mbligh36768f02008-02-22 18:28:33 +0000290
Simran Basi0ec94dd2012-08-28 09:50:10 -0700291 def _log_tick_msg(self, msg):
292 if self._tick_debug:
293 logging.debug(msg)
294
295
Simran Basidef92872012-09-20 13:34:34 -0700296 def _log_extra_msg(self, msg):
297 if self._extra_debugging:
298 logging.debug(msg)
299
300
jadmanski0afbb632008-06-06 21:10:57 +0000301 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700302 """
303 This is an altered version of tick() where we keep track of when each
304 major step begins so we can try to figure out where we are using most
305 of the tick time.
306 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700307 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700308 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000309 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700310 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000311 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700312 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000313 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700314 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000315 self._find_aborting()
beeps8bb1f7d2013-08-05 01:30:09 -0700316 self._log_tick_msg('Calling _find_aborted_special_tasks().')
317 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700318 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000319 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700320 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000321 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700322 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000323 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700324 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000325 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700326 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000327 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700328 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000329 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700330 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000331 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700332 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000333 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700334 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700335 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700336 with timer.get_client('email_manager_send_queued_emails'):
337 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700338 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700339 with timer.get_client('django_db_reset_queries'):
340 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000341 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000342
showard97aed502008-11-04 02:01:24 +0000343
mblighf3294cc2009-04-08 21:17:38 +0000344 def _run_cleanup(self):
345 self._periodic_cleanup.run_cleanup_maybe()
346 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000347
mbligh36768f02008-02-22 18:28:33 +0000348
showardf13a9e22009-12-18 22:54:09 +0000349 def _garbage_collection(self):
350 threshold_time = time.time() - self._seconds_between_garbage_stats
351 if threshold_time < self._last_garbage_stats_time:
352 # Don't generate these reports very often.
353 return
354
355 self._last_garbage_stats_time = time.time()
356 # Force a full level 0 collection (because we can, it doesn't hurt
357 # at this interval).
358 gc.collect()
359 logging.info('Logging garbage collector stats on tick %d.',
360 self._tick_count)
361 gc_stats._log_garbage_collector_stats()
362
363
showard170873e2009-01-07 00:22:26 +0000364 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
365 for object_id in object_ids:
366 agent_dict.setdefault(object_id, set()).add(agent)
367
368
369 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
370 for object_id in object_ids:
371 assert object_id in agent_dict
372 agent_dict[object_id].remove(agent)
373
374
showardd1195652009-12-08 22:21:02 +0000375 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700376 """
377 Creates and adds an agent to the dispatchers list.
378
379 In creating the agent we also pass on all the queue_entry_ids and
380 host_ids from the special agent task. For every agent we create, we
381 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
382 against the host_ids given to it. So theoritically, a host can have any
383 number of agents associated with it, and each of them can have any
384 special agent task, though in practice we never see > 1 agent/task per
385 host at any time.
386
387 @param agent_task: A SpecialTask for the agent to manage.
388 """
showardd1195652009-12-08 22:21:02 +0000389 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000390 self._agents.append(agent)
391 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000392 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
393 self._register_agent_for_ids(self._queue_entry_agents,
394 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000395
showard170873e2009-01-07 00:22:26 +0000396
397 def get_agents_for_entry(self, queue_entry):
398 """
399 Find agents corresponding to the specified queue_entry.
400 """
showardd3dc1992009-04-22 21:01:40 +0000401 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000402
403
404 def host_has_agent(self, host):
405 """
406 Determine if there is currently an Agent present using this host.
407 """
408 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000409
410
jadmanski0afbb632008-06-06 21:10:57 +0000411 def remove_agent(self, agent):
412 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000413 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
414 agent)
415 self._unregister_agent_for_ids(self._queue_entry_agents,
416 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000417
418
showard8cc058f2009-09-08 16:26:33 +0000419 def _host_has_scheduled_special_task(self, host):
420 return bool(models.SpecialTask.objects.filter(host__id=host.id,
421 is_active=False,
422 is_complete=False))
423
424
jadmanski0afbb632008-06-06 21:10:57 +0000425 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000426 agent_tasks = self._create_recovery_agent_tasks()
427 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000428 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000429 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000430 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000431 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000432 self._reverify_remaining_hosts()
433 # reinitialize drones after killing orphaned processes, since they can
434 # leave around files when they die
435 _drone_manager.execute_actions()
436 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000437
showard170873e2009-01-07 00:22:26 +0000438
showardd1195652009-12-08 22:21:02 +0000439 def _create_recovery_agent_tasks(self):
440 return (self._get_queue_entry_agent_tasks()
441 + self._get_special_task_agent_tasks(is_active=True))
442
443
444 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700445 """
446 Get agent tasks for all hqe in the specified states.
447
448 Loosely this translates to taking a hqe in one of the specified states,
449 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
450 through _get_agent_task_for_queue_entry. Each queue entry can only have
451 one agent task at a time, but there might be multiple queue entries in
452 the group.
453
454 @return: A list of AgentTasks.
455 """
showardd1195652009-12-08 22:21:02 +0000456 # host queue entry statuses handled directly by AgentTasks (Verifying is
457 # handled through SpecialTasks, so is not listed here)
458 statuses = (models.HostQueueEntry.Status.STARTING,
459 models.HostQueueEntry.Status.RUNNING,
460 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000461 models.HostQueueEntry.Status.PARSING,
462 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000463 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000464 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000465 where='status IN (%s)' % status_list)
466
467 agent_tasks = []
468 used_queue_entries = set()
469 for entry in queue_entries:
470 if self.get_agents_for_entry(entry):
471 # already being handled
472 continue
473 if entry in used_queue_entries:
474 # already picked up by a synchronous job
475 continue
476 agent_task = self._get_agent_task_for_queue_entry(entry)
477 agent_tasks.append(agent_task)
478 used_queue_entries.update(agent_task.queue_entries)
479 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000480
481
showardd1195652009-12-08 22:21:02 +0000482 def _get_special_task_agent_tasks(self, is_active=False):
483 special_tasks = models.SpecialTask.objects.filter(
484 is_active=is_active, is_complete=False)
485 return [self._get_agent_task_for_special_task(task)
486 for task in special_tasks]
487
488
489 def _get_agent_task_for_queue_entry(self, queue_entry):
490 """
beeps8bb1f7d2013-08-05 01:30:09 -0700491 Construct an AgentTask instance for the given active HostQueueEntry.
492
showardd1195652009-12-08 22:21:02 +0000493 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700494 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000495 """
496 task_entries = queue_entry.job.get_group_entries(queue_entry)
497 self._check_for_duplicate_host_entries(task_entries)
498
499 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
500 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000501 if queue_entry.is_hostless():
502 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000503 return QueueTask(queue_entries=task_entries)
504 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
505 return GatherLogsTask(queue_entries=task_entries)
506 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
507 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000508 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
509 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000510
Dale Curtisaa513362011-03-01 17:27:44 -0800511 raise host_scheduler.SchedulerError(
512 '_get_agent_task_for_queue_entry got entry with '
513 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000514
515
516 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000517 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
518 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000519 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000520 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000521 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000522 if using_host:
showardd1195652009-12-08 22:21:02 +0000523 self._assert_host_has_no_agent(task_entry)
524
525
526 def _assert_host_has_no_agent(self, entry):
527 """
528 @param entry: a HostQueueEntry or a SpecialTask
529 """
530 if self.host_has_agent(entry.host):
531 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800532 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000533 'While scheduling %s, host %s already has a host agent %s'
534 % (entry, entry.host, agent.task))
535
536
537 def _get_agent_task_for_special_task(self, special_task):
538 """
539 Construct an AgentTask class to run the given SpecialTask and add it
540 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700541
542 A special task is create through schedule_special_tasks, but only if
543 the host doesn't already have an agent. This happens through
544 add_agent_task. All special agent tasks are given a host on creation,
545 and a Null hqe. To create a SpecialAgentTask object, you need a
546 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
547 object contains a hqe it's passed on to the special agent task, which
548 creates a HostQueueEntry and saves it as it's queue_entry.
549
showardd1195652009-12-08 22:21:02 +0000550 @param special_task: a models.SpecialTask instance
551 @returns an AgentTask to run this SpecialTask
552 """
553 self._assert_host_has_no_agent(special_task)
554
Dan Shi07e09af2013-04-12 09:31:29 -0700555 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask,
Alex Millerdfff2fd2013-05-28 13:05:06 -0700556 ResetTask, ProvisionTask)
showardd1195652009-12-08 22:21:02 +0000557 for agent_task_class in special_agent_task_classes:
558 if agent_task_class.TASK_TYPE == special_task.task:
559 return agent_task_class(task=special_task)
560
Dale Curtisaa513362011-03-01 17:27:44 -0800561 raise host_scheduler.SchedulerError(
562 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000563
564
565 def _register_pidfiles(self, agent_tasks):
566 for agent_task in agent_tasks:
567 agent_task.register_necessary_pidfiles()
568
569
570 def _recover_tasks(self, agent_tasks):
571 orphans = _drone_manager.get_orphaned_autoserv_processes()
572
573 for agent_task in agent_tasks:
574 agent_task.recover()
575 if agent_task.monitor and agent_task.monitor.has_process():
576 orphans.discard(agent_task.monitor.get_process())
577 self.add_agent_task(agent_task)
578
579 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000580
581
showard8cc058f2009-09-08 16:26:33 +0000582 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000583 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
584 % status):
showard0db3d432009-10-12 20:29:15 +0000585 if entry.status == status and not self.get_agents_for_entry(entry):
586 # The status can change during iteration, e.g., if job.run()
587 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000588 yield entry
589
590
showard6878e8b2009-07-20 22:37:45 +0000591 def _check_for_remaining_orphan_processes(self, orphans):
592 if not orphans:
593 return
594 subject = 'Unrecovered orphan autoserv processes remain'
595 message = '\n'.join(str(process) for process in orphans)
596 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000597
598 die_on_orphans = global_config.global_config.get_config_value(
599 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
600
601 if die_on_orphans:
602 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000603
showard170873e2009-01-07 00:22:26 +0000604
showard8cc058f2009-09-08 16:26:33 +0000605 def _recover_pending_entries(self):
606 for entry in self._get_unassigned_entries(
607 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000608 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000609 entry.on_pending()
610
611
showardb8900452009-10-12 20:31:01 +0000612 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000613 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000614 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
615 unrecovered_hqes = []
616 for queue_entry in queue_entries:
617 special_tasks = models.SpecialTask.objects.filter(
618 task__in=(models.SpecialTask.Task.CLEANUP,
619 models.SpecialTask.Task.VERIFY),
620 queue_entry__id=queue_entry.id,
621 is_complete=False)
622 if special_tasks.count() == 0:
623 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000624
showardb8900452009-10-12 20:31:01 +0000625 if unrecovered_hqes:
626 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800627 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000628 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000629 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000630
631
showard65db3932009-10-28 19:54:35 +0000632 def _get_prioritized_special_tasks(self):
633 """
634 Returns all queued SpecialTasks prioritized for repair first, then
635 cleanup, then verify.
beeps8bb1f7d2013-08-05 01:30:09 -0700636
637 @return: list of afe.models.SpecialTasks sorted according to priority.
showard65db3932009-10-28 19:54:35 +0000638 """
639 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
640 is_complete=False,
641 host__locked=False)
642 # exclude hosts with active queue entries unless the SpecialTask is for
643 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000644 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000645 queued_tasks, 'afe_host_queue_entries', 'host_id',
646 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000647 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000648 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000649 where=['(afe_host_queue_entries.id IS NULL OR '
650 'afe_host_queue_entries.id = '
651 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000652
showard65db3932009-10-28 19:54:35 +0000653 # reorder tasks by priority
654 task_priority_order = [models.SpecialTask.Task.REPAIR,
655 models.SpecialTask.Task.CLEANUP,
Dan Shi07e09af2013-04-12 09:31:29 -0700656 models.SpecialTask.Task.VERIFY,
Alex Millerdfff2fd2013-05-28 13:05:06 -0700657 models.SpecialTask.Task.RESET,
658 models.SpecialTask.Task.PROVISION]
showard65db3932009-10-28 19:54:35 +0000659 def task_priority_key(task):
660 return task_priority_order.index(task.task)
661 return sorted(queued_tasks, key=task_priority_key)
662
663
showard65db3932009-10-28 19:54:35 +0000664 def _schedule_special_tasks(self):
665 """
666 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700667
668 Special tasks include PreJobTasks like verify, reset and cleanup.
669 They are created through _schedule_new_jobs and associated with a hqe
670 This method translates SpecialTasks to the appropriate AgentTask and
671 adds them to the dispatchers agents list, so _handle_agents can execute
672 them.
showard65db3932009-10-28 19:54:35 +0000673 """
674 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000675 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000676 continue
showardd1195652009-12-08 22:21:02 +0000677 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000678
679
showard170873e2009-01-07 00:22:26 +0000680 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000681 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000682 # should never happen
showarded2afea2009-07-07 20:54:07 +0000683 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000684 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000685 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700686 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000687 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000688
689
jadmanski0afbb632008-06-06 21:10:57 +0000690 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000691 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700692 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000693 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000694 if self.host_has_agent(host):
695 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000696 continue
showard8cc058f2009-09-08 16:26:33 +0000697 if self._host_has_scheduled_special_task(host):
698 # host will have a special task scheduled on the next cycle
699 continue
showard170873e2009-01-07 00:22:26 +0000700 if print_message:
showardb18134f2009-03-20 20:52:18 +0000701 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000702 models.SpecialTask.objects.create(
703 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000704 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000705
706
jadmanski0afbb632008-06-06 21:10:57 +0000707 def _recover_hosts(self):
708 # recover "Repair Failed" hosts
709 message = 'Reverifying dead host %s'
710 self._reverify_hosts_where("status = 'Repair Failed'",
711 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000712
713
showard04c82c52008-05-29 19:38:12 +0000714
showardb95b1bd2008-08-15 18:11:04 +0000715 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000716 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +0000717 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000718 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000719 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +0000720 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000721
722
showard89f84db2009-03-12 20:39:13 +0000723 def _refresh_pending_queue_entries(self):
724 """
725 Lookup the pending HostQueueEntries and call our HostScheduler
726 refresh() method given that list. Return the list.
727
728 @returns A list of pending HostQueueEntries sorted in priority order.
729 """
showard63a34772008-08-18 19:32:50 +0000730 queue_entries = self._get_pending_queue_entries()
731 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000732 return []
showardb95b1bd2008-08-15 18:11:04 +0000733
showard63a34772008-08-18 19:32:50 +0000734 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000735
showard89f84db2009-03-12 20:39:13 +0000736 return queue_entries
737
738
739 def _schedule_atomic_group(self, queue_entry):
740 """
741 Schedule the given queue_entry on an atomic group of hosts.
742
743 Returns immediately if there are insufficient available hosts.
744
745 Creates new HostQueueEntries based off of queue_entry for the
746 scheduled hosts and starts them all running.
747 """
748 # This is a virtual host queue entry representing an entire
749 # atomic group, find a group and schedule their hosts.
750 group_hosts = self._host_scheduler.find_eligible_atomic_group(
751 queue_entry)
752 if not group_hosts:
753 return
showardcbe6f942009-06-17 19:33:49 +0000754
755 logging.info('Expanding atomic group entry %s with hosts %s',
756 queue_entry,
757 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000758
showard89f84db2009-03-12 20:39:13 +0000759 for assigned_host in group_hosts[1:]:
760 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000761 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000762 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000763 new_hqe.set_host(assigned_host)
764 self._run_queue_entry(new_hqe)
765
766 # The first assigned host uses the original HostQueueEntry
767 queue_entry.set_host(group_hosts[0])
768 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000769
770
showarda9545c02009-12-18 22:44:26 +0000771 def _schedule_hostless_job(self, queue_entry):
772 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000773 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000774
775
showard89f84db2009-03-12 20:39:13 +0000776 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700777 """
778 Find any new HQEs and call schedule_pre_job_tasks for it.
779
780 This involves setting the status of the HQE and creating a row in the
781 db corresponding the the special task, through
782 scheduler_models._queue_special_task. The new db row is then added as
783 an agent to the dispatcher through _schedule_special_tasks and
784 scheduled for execution on the drone through _handle_agents.
785 """
showard89f84db2009-03-12 20:39:13 +0000786 queue_entries = self._refresh_pending_queue_entries()
787 if not queue_entries:
788 return
789
Simran Basi3f6717d2012-09-13 15:21:22 -0700790 logging.debug('Processing %d queue_entries', len(queue_entries))
showard63a34772008-08-18 19:32:50 +0000791 for queue_entry in queue_entries:
Simran Basidef92872012-09-20 13:34:34 -0700792 self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
showarde55955f2009-10-07 20:48:58 +0000793 is_unassigned_atomic_group = (
794 queue_entry.atomic_group_id is not None
795 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +0000796
797 if queue_entry.is_hostless():
Simran Basidef92872012-09-20 13:34:34 -0700798 self._log_extra_msg('Scheduling hostless job.')
showarda9545c02009-12-18 22:44:26 +0000799 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000800 elif is_unassigned_atomic_group:
801 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +0000802 else:
jamesren883492a2010-02-12 00:45:18 +0000803 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +0000804 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +0000805 assert assigned_host.id == queue_entry.host_id
806 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000807
808
showard8cc058f2009-09-08 16:26:33 +0000809 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700810 """
811 Adds agents to the dispatcher.
812
813 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
814 QueueTask for example, will have a job with a control file, and
815 the agent will have methods that poll, abort and check if the queue
816 task is finished. The dispatcher runs the agent_task, as well as
817 other agents in it's _agents member, through _handle_agents, by
818 calling the Agents tick().
819
820 This method creates an agent for each HQE in one of (starting, running,
821 gathering, parsing, archiving) states, and adds it to the dispatcher so
822 it is handled by _handle_agents.
823 """
showardd1195652009-12-08 22:21:02 +0000824 for agent_task in self._get_queue_entry_agent_tasks():
825 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000826
827
828 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000829 for entry in scheduler_models.HostQueueEntry.fetch(
830 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000831 task = entry.job.schedule_delayed_callback_task(entry)
832 if task:
showardd1195652009-12-08 22:21:02 +0000833 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000834
835
jamesren883492a2010-02-12 00:45:18 +0000836 def _run_queue_entry(self, queue_entry):
Simran Basidef92872012-09-20 13:34:34 -0700837 self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
838 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000839 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000840
841
jadmanski0afbb632008-06-06 21:10:57 +0000842 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700843 """
844 Looks through the afe_host_queue_entries for an aborted entry.
845
846 The aborted bit is set on an HQE in many ways, the most common
847 being when a user requests an abort through the frontend, which
848 results in an rpc from the afe to abort_host_queue_entries.
849 """
jamesrene7c65cb2010-06-08 20:38:10 +0000850 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000851 for entry in scheduler_models.HostQueueEntry.fetch(
852 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +0000853 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +0000854 for agent in self.get_agents_for_entry(entry):
855 agent.abort()
856 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000857 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700858 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000859 for job in jobs_to_stop:
860 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000861
862
beeps8bb1f7d2013-08-05 01:30:09 -0700863 def _find_aborted_special_tasks(self):
864 """
865 Find SpecialTasks that have been marked for abortion.
866
867 Poll the database looking for SpecialTasks that are active
868 and have been marked for abortion, then abort them.
869 """
870
871 # The completed and active bits are very important when it comes
872 # to scheduler correctness. The active bit is set through the prolog
873 # of a special task, and reset through the cleanup method of the
874 # SpecialAgentTask. The cleanup is called both through the abort and
875 # epilog. The complete bit is set in several places, and in general
876 # a hanging job will have is_active=1 is_complete=0, while a special
877 # task which completed will have is_active=0 is_complete=1. To check
878 # aborts we directly check active because the complete bit is set in
879 # several places, including the epilog of agent tasks.
880 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
881 is_aborted=True)
882 for task in aborted_tasks:
883 # There are 2 ways to get the agent associated with a task,
884 # through the host and through the hqe. A special task
885 # always needs a host, but doesn't always need a hqe.
886 for agent in self._host_agents.get(task.host.id, []):
887 if isinstance(agent.task, SpecialAgentTask):
888
889 # The epilog preforms critical actions such as
890 # queueing the next SpecialTask, requeuing the
891 # hqe etc, however it doesn't actually kill the
892 # monitor process and set the 'done' bit. Epilogs
893 # assume that the job failed, and that the monitor
894 # process has already written an exit code. The
895 # done bit is a necessary condition for
896 # _handle_agents to schedule any more special
897 # tasks against the host, and it must be set
898 # in addition to is_active, is_complete and success.
899 agent.task.epilog()
900 agent.task.abort()
901
902
showard324bf812009-01-20 23:23:38 +0000903 def _can_start_agent(self, agent, num_started_this_cycle,
904 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000905 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000906 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000907 return True
908 # don't allow any nonzero-process agents to run after we've reached a
909 # limit (this avoids starvation of many-process agents)
910 if have_reached_limit:
911 return False
912 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000913 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000914 agent.task.owner_username,
915 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000916 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000917 return False
918 # if a single agent exceeds the per-cycle throttling, still allow it to
919 # run when it's the first agent in the cycle
920 if num_started_this_cycle == 0:
921 return True
922 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000923 if (num_started_this_cycle + agent.task.num_processes >
924 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000925 return False
926 return True
927
928
jadmanski0afbb632008-06-06 21:10:57 +0000929 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700930 """
931 Handles agents of the dispatcher.
932
933 Appropriate Agents are added to the dispatcher through
934 _schedule_running_host_queue_entries. These agents each
935 have a task. This method runs the agents task through
936 agent.tick() leading to:
937 agent.start
938 prolog -> AgentTasks prolog
939 For each queue entry:
940 sets host status/status to Running
941 set started_on in afe_host_queue_entries
942 run -> AgentTasks run
943 Creates PidfileRunMonitor
944 Queues the autoserv command line for this AgentTask
945 via the drone manager. These commands are executed
946 through the drone managers execute actions.
947 poll -> AgentTasks/BaseAgentTask poll
948 checks the monitors exit_code.
949 Executes epilog if task is finished.
950 Executes AgentTasks _finish_task
951 finish_task is usually responsible for setting the status
952 of the HQE/host, and updating it's active and complete fileds.
953
954 agent.is_done
955 Removed the agent from the dispatchers _agents queue.
956 Is_done checks the finished bit on the agent, that is
957 set based on the Agents task. During the agents poll
958 we check to see if the monitor process has exited in
959 it's finish method, and set the success member of the
960 task based on this exit code.
961 """
jadmanski0afbb632008-06-06 21:10:57 +0000962 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000963 have_reached_limit = False
964 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700965 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000966 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700967 self._log_extra_msg('Processing Agent with Host Ids: %s and '
968 'queue_entry ids:%s' % (agent.host_ids,
969 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000970 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000971 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000972 have_reached_limit):
973 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700974 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000975 continue
showardd1195652009-12-08 22:21:02 +0000976 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700977 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000978 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700979 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000980 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700981 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000982 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700983 logging.info('%d running processes. %d added this cycle.',
984 _drone_manager.total_running_processes(),
985 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +0000986
987
showard29f7cd22009-04-29 21:16:24 +0000988 def _process_recurring_runs(self):
989 recurring_runs = models.RecurringRun.objects.filter(
990 start_date__lte=datetime.datetime.now())
991 for rrun in recurring_runs:
992 # Create job from template
993 job = rrun.job
994 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000995 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000996
997 host_objects = info['hosts']
998 one_time_hosts = info['one_time_hosts']
999 metahost_objects = info['meta_hosts']
1000 dependencies = info['dependencies']
1001 atomic_group = info['atomic_group']
1002
1003 for host in one_time_hosts or []:
1004 this_host = models.Host.create_one_time_host(host.hostname)
1005 host_objects.append(this_host)
1006
1007 try:
1008 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001009 options=options,
showard29f7cd22009-04-29 21:16:24 +00001010 host_objects=host_objects,
1011 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001012 atomic_group=atomic_group)
1013
1014 except Exception, ex:
1015 logging.exception(ex)
1016 #TODO send email
1017
1018 if rrun.loop_count == 1:
1019 rrun.delete()
1020 else:
1021 if rrun.loop_count != 0: # if not infinite loop
1022 # calculate new start_date
1023 difference = datetime.timedelta(seconds=rrun.loop_period)
1024 rrun.start_date = rrun.start_date + difference
1025 rrun.loop_count -= 1
1026 rrun.save()
1027
1028
Simran Basia858a232012-08-21 11:04:37 -07001029SiteDispatcher = utils.import_site_class(
1030 __file__, 'autotest_lib.scheduler.site_monitor_db',
1031 'SiteDispatcher', BaseDispatcher)
1032
1033class Dispatcher(SiteDispatcher):
1034 pass
1035
1036
showard170873e2009-01-07 00:22:26 +00001037class PidfileRunMonitor(object):
1038 """
1039 Client must call either run() to start a new process or
1040 attach_to_existing_process().
1041 """
mbligh36768f02008-02-22 18:28:33 +00001042
showard170873e2009-01-07 00:22:26 +00001043 class _PidfileException(Exception):
1044 """
1045 Raised when there's some unexpected behavior with the pid file, but only
1046 used internally (never allowed to escape this class).
1047 """
mbligh36768f02008-02-22 18:28:33 +00001048
1049
showard170873e2009-01-07 00:22:26 +00001050 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001051 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001052 self._start_time = None
1053 self.pidfile_id = None
1054 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001055
1056
showard170873e2009-01-07 00:22:26 +00001057 def _add_nice_command(self, command, nice_level):
1058 if not nice_level:
1059 return command
1060 return ['nice', '-n', str(nice_level)] + command
1061
1062
1063 def _set_start_time(self):
1064 self._start_time = time.time()
1065
1066
showard418785b2009-11-23 20:19:59 +00001067 def run(self, command, working_directory, num_processes, nice_level=None,
1068 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +00001069 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +00001070 assert command is not None
1071 if nice_level is not None:
1072 command = ['nice', '-n', str(nice_level)] + command
1073 self._set_start_time()
1074 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001075 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001076 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +00001077 paired_with_pidfile=paired_with_pidfile, username=username,
1078 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +00001079
1080
showarded2afea2009-07-07 20:54:07 +00001081 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +00001082 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +00001083 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001084 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001085 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001086 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001087 if num_processes is not None:
1088 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001089
1090
jadmanski0afbb632008-06-06 21:10:57 +00001091 def kill(self):
showard170873e2009-01-07 00:22:26 +00001092 if self.has_process():
1093 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001094
mbligh36768f02008-02-22 18:28:33 +00001095
showard170873e2009-01-07 00:22:26 +00001096 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001097 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001098 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001099
1100
showard170873e2009-01-07 00:22:26 +00001101 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001102 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001103 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001104 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001105
1106
showard170873e2009-01-07 00:22:26 +00001107 def _read_pidfile(self, use_second_read=False):
1108 assert self.pidfile_id is not None, (
1109 'You must call run() or attach_to_existing_process()')
1110 contents = _drone_manager.get_pidfile_contents(
1111 self.pidfile_id, use_second_read=use_second_read)
1112 if contents.is_invalid():
1113 self._state = drone_manager.PidfileContents()
1114 raise self._PidfileException(contents)
1115 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001116
1117
showard21baa452008-10-21 00:08:39 +00001118 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001119 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1120 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001121 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001122 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001123
1124
1125 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001126 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001127 return
mblighbb421852008-03-11 22:36:16 +00001128
showard21baa452008-10-21 00:08:39 +00001129 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001130
showard170873e2009-01-07 00:22:26 +00001131 if self._state.process is None:
1132 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001133 return
mbligh90a549d2008-03-25 23:52:34 +00001134
showard21baa452008-10-21 00:08:39 +00001135 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001136 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001137 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001138 return
mbligh90a549d2008-03-25 23:52:34 +00001139
showard170873e2009-01-07 00:22:26 +00001140 # pid but no running process - maybe process *just* exited
1141 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001142 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001143 # autoserv exited without writing an exit code
1144 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001145 self._handle_pidfile_error(
1146 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001147
showard21baa452008-10-21 00:08:39 +00001148
1149 def _get_pidfile_info(self):
1150 """\
1151 After completion, self._state will contain:
1152 pid=None, exit_status=None if autoserv has not yet run
1153 pid!=None, exit_status=None if autoserv is running
1154 pid!=None, exit_status!=None if autoserv has completed
1155 """
1156 try:
1157 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001158 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001159 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001160
1161
showard170873e2009-01-07 00:22:26 +00001162 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001163 """\
1164 Called when no pidfile is found or no pid is in the pidfile.
1165 """
showard170873e2009-01-07 00:22:26 +00001166 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001167 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001168 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001169 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001170 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001171
1172
showard35162b02009-03-03 02:17:30 +00001173 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001174 """\
1175 Called when autoserv has exited without writing an exit status,
1176 or we've timed out waiting for autoserv to write a pid to the
1177 pidfile. In either case, we just return failure and the caller
1178 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001179
showard170873e2009-01-07 00:22:26 +00001180 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001181 """
1182 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001183 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001184 self._state.exit_status = 1
1185 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001186
1187
jadmanski0afbb632008-06-06 21:10:57 +00001188 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001189 self._get_pidfile_info()
1190 return self._state.exit_status
1191
1192
1193 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001194 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001195 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001196 if self._state.num_tests_failed is None:
1197 return -1
showard21baa452008-10-21 00:08:39 +00001198 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001199
1200
showardcdaeae82009-08-31 18:32:48 +00001201 def try_copy_results_on_drone(self, **kwargs):
1202 if self.has_process():
1203 # copy results logs into the normal place for job results
1204 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1205
1206
1207 def try_copy_to_results_repository(self, source, **kwargs):
1208 if self.has_process():
1209 _drone_manager.copy_to_results_repository(self.get_process(),
1210 source, **kwargs)
1211
1212
mbligh36768f02008-02-22 18:28:33 +00001213class Agent(object):
showard77182562009-06-10 00:16:05 +00001214 """
Alex Miller47715eb2013-07-24 03:34:01 -07001215 An agent for use by the Dispatcher class to perform a task. An agent wraps
1216 around an AgentTask mainly to associate the AgentTask with the queue_entry
1217 and host ids.
showard77182562009-06-10 00:16:05 +00001218
1219 The following methods are required on all task objects:
1220 poll() - Called periodically to let the task check its status and
1221 update its internal state. If the task succeeded.
1222 is_done() - Returns True if the task is finished.
1223 abort() - Called when an abort has been requested. The task must
1224 set its aborted attribute to True if it actually aborted.
1225
1226 The following attributes are required on all task objects:
1227 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001228 success - bool, True if this task succeeded.
1229 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1230 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001231 """
1232
1233
showard418785b2009-11-23 20:19:59 +00001234 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001235 """
Alex Miller47715eb2013-07-24 03:34:01 -07001236 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001237 """
showard8cc058f2009-09-08 16:26:33 +00001238 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001239
showard77182562009-06-10 00:16:05 +00001240 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001241 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001242
showard8cc058f2009-09-08 16:26:33 +00001243 self.queue_entry_ids = task.queue_entry_ids
1244 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001245
showard8cc058f2009-09-08 16:26:33 +00001246 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001247 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001248
1249
jadmanski0afbb632008-06-06 21:10:57 +00001250 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001251 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001252 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001253 self.task.poll()
1254 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001255 self.finished = True
showardec113162008-05-08 00:52:49 +00001256
1257
jadmanski0afbb632008-06-06 21:10:57 +00001258 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001259 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001260
1261
showardd3dc1992009-04-22 21:01:40 +00001262 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001263 if self.task:
1264 self.task.abort()
1265 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001266 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001267 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001268
showardd3dc1992009-04-22 21:01:40 +00001269
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001270class BaseAgentTask(object):
showardd1195652009-12-08 22:21:02 +00001271 class _NullMonitor(object):
1272 pidfile_id = None
1273
1274 def has_process(self):
1275 return True
1276
1277
1278 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001279 """
showardd1195652009-12-08 22:21:02 +00001280 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001281 """
jadmanski0afbb632008-06-06 21:10:57 +00001282 self.done = False
showardd1195652009-12-08 22:21:02 +00001283 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001284 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001285 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001286 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001287 self.queue_entry_ids = []
1288 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001289 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001290
1291
1292 def _set_ids(self, host=None, queue_entries=None):
1293 if queue_entries and queue_entries != [None]:
1294 self.host_ids = [entry.host.id for entry in queue_entries]
1295 self.queue_entry_ids = [entry.id for entry in queue_entries]
1296 else:
1297 assert host
1298 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001299
1300
jadmanski0afbb632008-06-06 21:10:57 +00001301 def poll(self):
showard08a36412009-05-05 01:01:13 +00001302 if not self.started:
1303 self.start()
showardd1195652009-12-08 22:21:02 +00001304 if not self.done:
1305 self.tick()
showard08a36412009-05-05 01:01:13 +00001306
1307
1308 def tick(self):
showardd1195652009-12-08 22:21:02 +00001309 assert self.monitor
1310 exit_code = self.monitor.exit_code()
1311 if exit_code is None:
1312 return
mbligh36768f02008-02-22 18:28:33 +00001313
showardd1195652009-12-08 22:21:02 +00001314 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001315 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001316
1317
jadmanski0afbb632008-06-06 21:10:57 +00001318 def is_done(self):
1319 return self.done
mbligh36768f02008-02-22 18:28:33 +00001320
1321
jadmanski0afbb632008-06-06 21:10:57 +00001322 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001323 if self.done:
showardd1195652009-12-08 22:21:02 +00001324 assert self.started
showard08a36412009-05-05 01:01:13 +00001325 return
showardd1195652009-12-08 22:21:02 +00001326 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001327 self.done = True
1328 self.success = success
1329 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001330
1331
jadmanski0afbb632008-06-06 21:10:57 +00001332 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001333 """
1334 To be overridden.
1335 """
showarded2afea2009-07-07 20:54:07 +00001336 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001337 self.register_necessary_pidfiles()
1338
1339
1340 def _log_file(self):
1341 if not self._log_file_name:
1342 return None
1343 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001344
mbligh36768f02008-02-22 18:28:33 +00001345
jadmanski0afbb632008-06-06 21:10:57 +00001346 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001347 log_file = self._log_file()
1348 if self.monitor and log_file:
1349 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001350
1351
jadmanski0afbb632008-06-06 21:10:57 +00001352 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001353 """
1354 To be overridden.
1355 """
jadmanski0afbb632008-06-06 21:10:57 +00001356 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001357 logging.info("%s finished with success=%s", type(self).__name__,
1358 self.success)
1359
mbligh36768f02008-02-22 18:28:33 +00001360
1361
jadmanski0afbb632008-06-06 21:10:57 +00001362 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001363 if not self.started:
1364 self.prolog()
1365 self.run()
1366
1367 self.started = True
1368
1369
1370 def abort(self):
1371 if self.monitor:
1372 self.monitor.kill()
1373 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001374 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001375 self.cleanup()
1376
1377
showarded2afea2009-07-07 20:54:07 +00001378 def _get_consistent_execution_path(self, execution_entries):
1379 first_execution_path = execution_entries[0].execution_path()
1380 for execution_entry in execution_entries[1:]:
1381 assert execution_entry.execution_path() == first_execution_path, (
1382 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1383 execution_entry,
1384 first_execution_path,
1385 execution_entries[0]))
1386 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001387
1388
showarded2afea2009-07-07 20:54:07 +00001389 def _copy_results(self, execution_entries, use_monitor=None):
1390 """
1391 @param execution_entries: list of objects with execution_path() method
1392 """
showard6d1c1432009-08-20 23:30:39 +00001393 if use_monitor is not None and not use_monitor.has_process():
1394 return
1395
showarded2afea2009-07-07 20:54:07 +00001396 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001397 if use_monitor is None:
1398 assert self.monitor
1399 use_monitor = self.monitor
1400 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001401 execution_path = self._get_consistent_execution_path(execution_entries)
1402 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001403 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001404
showarda1e74b32009-05-12 17:32:04 +00001405
1406 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001407 for queue_entry in queue_entries:
1408 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001409
1410
mbligh4608b002010-01-05 18:22:35 +00001411 def _archive_results(self, queue_entries):
1412 for queue_entry in queue_entries:
1413 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001414
1415
showardd1195652009-12-08 22:21:02 +00001416 def _command_line(self):
1417 """
1418 Return the command line to run. Must be overridden.
1419 """
1420 raise NotImplementedError
1421
1422
1423 @property
1424 def num_processes(self):
1425 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001426 Return the number of processes forked by this BaseAgentTask's process.
1427 It may only be approximate. To be overridden if necessary.
showardd1195652009-12-08 22:21:02 +00001428 """
1429 return 1
1430
1431
1432 def _paired_with_monitor(self):
1433 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001434 If this BaseAgentTask's process must run on the same machine as some
showardd1195652009-12-08 22:21:02 +00001435 previous process, this method should be overridden to return a
1436 PidfileRunMonitor for that process.
1437 """
1438 return self._NullMonitor()
1439
1440
1441 @property
1442 def owner_username(self):
1443 """
1444 Return login of user responsible for this task. May be None. Must be
1445 overridden.
1446 """
1447 raise NotImplementedError
1448
1449
1450 def _working_directory(self):
1451 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001452 Return the directory where this BaseAgentTask's process executes.
1453 Must be overridden.
showardd1195652009-12-08 22:21:02 +00001454 """
1455 raise NotImplementedError
1456
1457
1458 def _pidfile_name(self):
1459 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001460 Return the name of the pidfile this BaseAgentTask's process uses. To be
showardd1195652009-12-08 22:21:02 +00001461 overridden if necessary.
1462 """
jamesrenc44ae992010-02-19 00:12:54 +00001463 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001464
1465
1466 def _check_paired_results_exist(self):
1467 if not self._paired_with_monitor().has_process():
1468 email_manager.manager.enqueue_notify_email(
1469 'No paired results in task',
1470 'No paired results in task %s at %s'
1471 % (self, self._paired_with_monitor().pidfile_id))
1472 self.finished(False)
1473 return False
1474 return True
1475
1476
1477 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001478 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001479 self.monitor = PidfileRunMonitor()
1480
1481
1482 def run(self):
1483 if not self._check_paired_results_exist():
1484 return
1485
1486 self._create_monitor()
1487 self.monitor.run(
1488 self._command_line(), self._working_directory(),
1489 num_processes=self.num_processes,
1490 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1491 pidfile_name=self._pidfile_name(),
1492 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001493 username=self.owner_username,
1494 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1495
1496
1497 def get_drone_hostnames_allowed(self):
1498 if not models.DroneSet.drone_sets_enabled():
1499 return None
1500
1501 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1502 if not hqes:
1503 # Only special tasks could be missing host queue entries
1504 assert isinstance(self, SpecialAgentTask)
1505 return self._user_or_global_default_drone_set(
1506 self.task, self.task.requested_by)
1507
1508 job_ids = hqes.values_list('job', flat=True).distinct()
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001509 assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
jamesren76fcf192010-04-21 20:39:50 +00001510 "span multiple jobs")
1511
1512 job = models.Job.objects.get(id=job_ids[0])
1513 drone_set = job.drone_set
1514 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001515 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001516
1517 return drone_set.get_drone_hostnames()
1518
1519
1520 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1521 """
1522 Returns the user's default drone set, if present.
1523
1524 Otherwise, returns the global default drone set.
1525 """
1526 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1527 if not user:
1528 logging.warn('%s had no owner; using default drone set',
1529 obj_with_owner)
1530 return default_hostnames
1531 if not user.drone_set:
1532 logging.warn('User %s has no default drone set, using global '
1533 'default', user.login)
1534 return default_hostnames
1535 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001536
1537
1538 def register_necessary_pidfiles(self):
1539 pidfile_id = _drone_manager.get_pidfile_id_from(
1540 self._working_directory(), self._pidfile_name())
1541 _drone_manager.register_pidfile(pidfile_id)
1542
1543 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1544 if paired_pidfile_id:
1545 _drone_manager.register_pidfile(paired_pidfile_id)
1546
1547
1548 def recover(self):
1549 if not self._check_paired_results_exist():
1550 return
1551
1552 self._create_monitor()
1553 self.monitor.attach_to_existing_process(
1554 self._working_directory(), pidfile_name=self._pidfile_name(),
1555 num_processes=self.num_processes)
1556 if not self.monitor.has_process():
1557 # no process to recover; wait to be started normally
1558 self.monitor = None
1559 return
1560
1561 self.started = True
Aviv Keshet225bdfe2013-03-05 10:10:08 -08001562 logging.info('Recovering process %s for %s at %s',
1563 self.monitor.get_process(), type(self).__name__,
1564 self._working_directory())
mbligh36768f02008-02-22 18:28:33 +00001565
1566
mbligh4608b002010-01-05 18:22:35 +00001567 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1568 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001569 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001570 for entry in queue_entries:
1571 if entry.status not in allowed_hqe_statuses:
Dale Curtisaa513362011-03-01 17:27:44 -08001572 raise host_scheduler.SchedulerError(
1573 '%s attempting to start entry with invalid status %s: '
1574 '%s' % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001575 invalid_host_status = (
1576 allowed_host_statuses is not None
1577 and entry.host.status not in allowed_host_statuses)
1578 if invalid_host_status:
Dale Curtisaa513362011-03-01 17:27:44 -08001579 raise host_scheduler.SchedulerError(
1580 '%s attempting to start on queue entry with invalid '
1581 'host status %s: %s'
1582 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001583
1584
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001585SiteAgentTask = utils.import_site_class(
1586 __file__, 'autotest_lib.scheduler.site_monitor_db',
1587 'SiteAgentTask', BaseAgentTask)
1588
1589class AgentTask(SiteAgentTask):
1590 pass
1591
1592
showardd9205182009-04-27 20:09:55 +00001593class TaskWithJobKeyvals(object):
1594 """AgentTask mixin providing functionality to help with job keyval files."""
1595 _KEYVAL_FILE = 'keyval'
1596 def _format_keyval(self, key, value):
1597 return '%s=%s' % (key, value)
1598
1599
1600 def _keyval_path(self):
1601 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001602 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001603
1604
1605 def _write_keyval_after_job(self, field, value):
1606 assert self.monitor
1607 if not self.monitor.has_process():
1608 return
1609 _drone_manager.write_lines_to_file(
1610 self._keyval_path(), [self._format_keyval(field, value)],
1611 paired_with_process=self.monitor.get_process())
1612
1613
1614 def _job_queued_keyval(self, job):
1615 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1616
1617
1618 def _write_job_finished(self):
1619 self._write_keyval_after_job("job_finished", int(time.time()))
1620
1621
showarddb502762009-09-09 15:31:20 +00001622 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1623 keyval_contents = '\n'.join(self._format_keyval(key, value)
1624 for key, value in keyval_dict.iteritems())
1625 # always end with a newline to allow additional keyvals to be written
1626 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001627 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001628 keyval_contents,
1629 file_path=keyval_path)
1630
1631
1632 def _write_keyvals_before_job(self, keyval_dict):
1633 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1634
1635
1636 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001637 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001638 host.hostname)
1639 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001640 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001641 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1642 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1643
1644
showard8cc058f2009-09-08 16:26:33 +00001645class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001646 """
1647 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1648 """
1649
1650 TASK_TYPE = None
1651 host = None
1652 queue_entry = None
1653
showardd1195652009-12-08 22:21:02 +00001654 def __init__(self, task, extra_command_args):
1655 super(SpecialAgentTask, self).__init__()
1656
lmrb7c5d272010-04-16 06:34:04 +00001657 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001658
jamesrenc44ae992010-02-19 00:12:54 +00001659 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001660 self.queue_entry = None
1661 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001662 self.queue_entry = scheduler_models.HostQueueEntry(
1663 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001664
showarded2afea2009-07-07 20:54:07 +00001665 self.task = task
1666 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001667
1668
showard8cc058f2009-09-08 16:26:33 +00001669 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001670 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1671
1672
1673 def _command_line(self):
1674 return _autoserv_command_line(self.host.hostname,
1675 self._extra_command_args,
1676 queue_entry=self.queue_entry)
1677
1678
1679 def _working_directory(self):
1680 return self.task.execution_path()
1681
1682
1683 @property
1684 def owner_username(self):
1685 if self.task.requested_by:
1686 return self.task.requested_by.login
1687 return None
showard8cc058f2009-09-08 16:26:33 +00001688
1689
showarded2afea2009-07-07 20:54:07 +00001690 def prolog(self):
1691 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001692 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001693 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001694
1695
showardde634ee2009-01-30 01:44:24 +00001696 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001697 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001698
showard2fe3f1d2009-07-06 20:19:11 +00001699 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001700 return # don't fail metahost entries, they'll be reassigned
1701
showard2fe3f1d2009-07-06 20:19:11 +00001702 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001703 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001704 return # entry has been aborted
1705
Alex Millerdfff2fd2013-05-28 13:05:06 -07001706 self._actually_fail_queue_entry()
1707
1708
1709 # TODO(milleral): http://crbug.com/268607
1710 # All this used to be a part of _fail_queue_entry. The
1711 # exact semantics of when one should and should not be failing a queue
1712 # entry need to be worked out, because provisioning has placed us in a
1713 # case where we want to fail a queue entry that could be requeued,
1714 # which makes us fail the two above if statements, and thus
1715 # _fail_queue_entry() would exit early and have no effect.
1716 # What's left here with _actually_fail_queue_entry is a hack to be able to
1717 # bypass the checks and unconditionally execute the code.
1718 def _actually_fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001719 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001720 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001721 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001722 self._write_keyval_after_job(queued_key, queued_time)
1723 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001724
showard8cc058f2009-09-08 16:26:33 +00001725 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001726 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001727 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001728 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001729
showard8cc058f2009-09-08 16:26:33 +00001730 pidfile_id = _drone_manager.get_pidfile_id_from(
1731 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001732 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001733 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001734
1735 if self.queue_entry.job.parse_failed_repair:
1736 self._parse_results([self.queue_entry])
1737 else:
1738 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001739
Alex Miller23676a22013-07-03 09:03:36 -07001740 # Also fail all other special tasks that have not yet run for this HQE
1741 pending_tasks = models.SpecialTask.objects.filter(
1742 queue_entry__id=self.queue_entry.id,
1743 is_complete=0)
Alex Miller5e36ccc2013-08-03 16:31:58 -07001744 for task in pending_tasks:
1745 task.finish(False)
Alex Miller23676a22013-07-03 09:03:36 -07001746
showard8cc058f2009-09-08 16:26:33 +00001747
1748 def cleanup(self):
1749 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001750
1751 # We will consider an aborted task to be "Failed"
1752 self.task.finish(bool(self.success))
1753
showardf85a0b72009-10-07 20:48:45 +00001754 if self.monitor:
1755 if self.monitor.has_process():
1756 self._copy_results([self.task])
1757 if self.monitor.pidfile_id is not None:
1758 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001759
1760
Dan Shi07e09af2013-04-12 09:31:29 -07001761 def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
1762 """Remove a type of special task in all tasks, keep last one if needed.
1763
1764 @param special_task_to_remove: type of special task to be removed, e.g.,
1765 models.SpecialTask.Task.VERIFY.
1766 @param keep_last_one: True to keep the last special task if its type is
1767 the same as of special_task_to_remove.
1768
1769 """
1770 queued_special_tasks = models.SpecialTask.objects.filter(
1771 host__id=self.host.id,
1772 task=special_task_to_remove,
1773 is_active=False, is_complete=False, queue_entry=None)
1774 if keep_last_one:
1775 queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
1776 queued_special_tasks.delete()
1777
1778
showard8cc058f2009-09-08 16:26:33 +00001779class RepairTask(SpecialAgentTask):
1780 TASK_TYPE = models.SpecialTask.Task.REPAIR
1781
1782
showardd1195652009-12-08 22:21:02 +00001783 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001784 """\
1785 queue_entry: queue entry to mark failed if this repair fails.
1786 """
1787 protection = host_protections.Protection.get_string(
1788 task.host.protection)
1789 # normalize the protection name
1790 protection = host_protections.Protection.get_attr_name(protection)
1791
1792 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001793 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001794
1795 # *don't* include the queue entry in IDs -- if the queue entry is
1796 # aborted, we want to leave the repair task running
1797 self._set_ids(host=self.host)
1798
1799
1800 def prolog(self):
1801 super(RepairTask, self).prolog()
1802 logging.info("repair_task starting")
1803 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001804
1805
jadmanski0afbb632008-06-06 21:10:57 +00001806 def epilog(self):
1807 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001808
jadmanski0afbb632008-06-06 21:10:57 +00001809 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001810 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001811 else:
showard8cc058f2009-09-08 16:26:33 +00001812 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001813 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001814 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001815
1816
showarded2afea2009-07-07 20:54:07 +00001817class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001818 def _copy_to_results_repository(self):
1819 if not self.queue_entry or self.queue_entry.meta_host:
1820 return
1821
1822 self.queue_entry.set_execution_subdir()
1823 log_name = os.path.basename(self.task.execution_path())
1824 source = os.path.join(self.task.execution_path(), 'debug',
1825 'autoserv.DEBUG')
1826 destination = os.path.join(
1827 self.queue_entry.execution_path(), log_name)
1828
1829 self.monitor.try_copy_to_results_repository(
1830 source, destination_path=destination)
1831
1832
showard170873e2009-01-07 00:22:26 +00001833 def epilog(self):
1834 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001835
showard775300b2009-09-09 15:30:50 +00001836 if self.success:
1837 return
showard8fe93b52008-11-18 17:53:22 +00001838
showard775300b2009-09-09 15:30:50 +00001839 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001840
showard775300b2009-09-09 15:30:50 +00001841 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001842 # effectively ignore failure for these hosts
1843 self.success = True
showard775300b2009-09-09 15:30:50 +00001844 return
1845
1846 if self.queue_entry:
1847 self.queue_entry.requeue()
Alex Millerf3f19452013-07-29 15:53:00 -07001848 # If we requeue a HQE, we should cancel any remaining pre-job
1849 # tasks against this host, otherwise we'll be left in a state
1850 # where a queued HQE has special tasks to run against a host.
1851 models.SpecialTask.objects.filter(
1852 queue_entry__id=self.queue_entry.id,
1853 host__id=self.host.id,
1854 is_complete=0).update(is_complete=1, success=0)
showard775300b2009-09-09 15:30:50 +00001855
1856 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001857 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001858 queue_entry__id=self.queue_entry.id):
1859 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1860 self._fail_queue_entry()
1861 return
1862
showard9bb960b2009-11-19 01:02:11 +00001863 queue_entry = models.HostQueueEntry.objects.get(
1864 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001865 else:
1866 queue_entry = None
1867
1868 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001869 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001870 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001871 queue_entry=queue_entry,
1872 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001873
showard8fe93b52008-11-18 17:53:22 +00001874
Alex Miller42437f92013-05-28 12:58:54 -07001875 def _should_pending(self):
1876 """
1877 Decide if we should call the host queue entry's on_pending method.
1878 We should if:
1879 1) There exists an associated host queue entry.
1880 2) The current special task completed successfully.
1881 3) There do not exist any more special tasks to be run before the
1882 host queue entry starts.
1883
1884 @returns: True if we should call pending, false if not.
1885
1886 """
1887 if not self.queue_entry or not self.success:
1888 return False
1889
1890 # We know if this is the last one when we create it, so we could add
1891 # another column to the database to keep track of this information, but
1892 # I expect the overhead of querying here to be minimal.
1893 queue_entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1894 queued = models.SpecialTask.objects.filter(
1895 host__id=self.host.id, is_active=False,
1896 is_complete=False, queue_entry=queue_entry)
1897 queued = queued.exclude(id=self.task.id)
1898 return queued.count() == 0
1899
1900
showard8fe93b52008-11-18 17:53:22 +00001901class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001902 TASK_TYPE = models.SpecialTask.Task.VERIFY
1903
1904
showardd1195652009-12-08 22:21:02 +00001905 def __init__(self, task):
1906 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001907 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001908
1909
jadmanski0afbb632008-06-06 21:10:57 +00001910 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001911 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001912
showardb18134f2009-03-20 20:52:18 +00001913 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001914 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001915 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1916 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001917
jamesren42318f72010-05-10 23:40:59 +00001918 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00001919 # and there's no need to keep records of other requests.
Dan Shi07e09af2013-04-12 09:31:29 -07001920 self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
1921 keep_last_one=True)
showard2fe3f1d2009-07-06 20:19:11 +00001922
mbligh36768f02008-02-22 18:28:33 +00001923
jadmanski0afbb632008-06-06 21:10:57 +00001924 def epilog(self):
1925 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001926 if self.success:
Alex Miller42437f92013-05-28 12:58:54 -07001927 if self._should_pending():
showard8cc058f2009-09-08 16:26:33 +00001928 self.queue_entry.on_pending()
1929 else:
1930 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001931
1932
mbligh4608b002010-01-05 18:22:35 +00001933class CleanupTask(PreJobTask):
1934 # note this can also run post-job, but when it does, it's running standalone
1935 # against the host (not related to the job), so it's not considered a
1936 # PostJobTask
1937
1938 TASK_TYPE = models.SpecialTask.Task.CLEANUP
1939
1940
1941 def __init__(self, task, recover_run_monitor=None):
1942 super(CleanupTask, self).__init__(task, ['--cleanup'])
1943 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1944
1945
1946 def prolog(self):
1947 super(CleanupTask, self).prolog()
1948 logging.info("starting cleanup task for host: %s", self.host.hostname)
1949 self.host.set_status(models.Host.Status.CLEANING)
1950 if self.queue_entry:
Dan Shi07e09af2013-04-12 09:31:29 -07001951 self.queue_entry.set_status(models.HostQueueEntry.Status.CLEANING)
mbligh4608b002010-01-05 18:22:35 +00001952
1953
1954 def _finish_epilog(self):
1955 if not self.queue_entry or not self.success:
1956 return
1957
1958 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
1959 should_run_verify = (
1960 self.queue_entry.job.run_verify
1961 and self.host.protection != do_not_verify_protection)
1962 if should_run_verify:
1963 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1964 models.SpecialTask.objects.create(
1965 host=models.Host.objects.get(id=self.host.id),
1966 queue_entry=entry,
1967 task=models.SpecialTask.Task.VERIFY)
1968 else:
Alex Miller42437f92013-05-28 12:58:54 -07001969 if self._should_pending():
1970 self.queue_entry.on_pending()
mbligh4608b002010-01-05 18:22:35 +00001971
1972
1973 def epilog(self):
1974 super(CleanupTask, self).epilog()
1975
1976 if self.success:
1977 self.host.update_field('dirty', 0)
1978 self.host.set_status(models.Host.Status.READY)
1979
1980 self._finish_epilog()
1981
1982
Dan Shi07e09af2013-04-12 09:31:29 -07001983class ResetTask(PreJobTask):
1984 """Task to reset a DUT, including cleanup and verify."""
1985 # note this can also run post-job, but when it does, it's running standalone
1986 # against the host (not related to the job), so it's not considered a
1987 # PostJobTask
1988
1989 TASK_TYPE = models.SpecialTask.Task.RESET
1990
1991
1992 def __init__(self, task, recover_run_monitor=None):
1993 super(ResetTask, self).__init__(task, ['--reset'])
1994 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1995
1996
1997 def prolog(self):
1998 super(ResetTask, self).prolog()
1999 logging.info('starting reset task for host: %s',
2000 self.host.hostname)
2001 self.host.set_status(models.Host.Status.RESETTING)
2002 if self.queue_entry:
2003 self.queue_entry.set_status(models.HostQueueEntry.Status.RESETTING)
2004
2005 # Delete any queued cleanups for this host.
2006 self.remove_special_tasks(models.SpecialTask.Task.CLEANUP,
2007 keep_last_one=False)
2008
2009 # Delete any queued reverifies for this host.
2010 self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
2011 keep_last_one=False)
2012
2013 # Only one reset is needed.
2014 self.remove_special_tasks(models.SpecialTask.Task.RESET,
2015 keep_last_one=True)
2016
2017
2018 def epilog(self):
2019 super(ResetTask, self).epilog()
2020
2021 if self.success:
2022 self.host.update_field('dirty', 0)
Dan Shi07e09af2013-04-12 09:31:29 -07002023
Alex Millerba076c52013-07-11 10:11:48 -07002024 if self._should_pending():
Dan Shi07e09af2013-04-12 09:31:29 -07002025 self.queue_entry.on_pending()
Alex Millerdc608d52013-07-30 14:26:21 -07002026 else:
2027 self.host.set_status(models.Host.Status.READY)
Dan Shi07e09af2013-04-12 09:31:29 -07002028
2029
Alex Millerdfff2fd2013-05-28 13:05:06 -07002030class ProvisionTask(PreJobTask):
2031 TASK_TYPE = models.SpecialTask.Task.PROVISION
2032
2033 def __init__(self, task):
2034 # Provisioning requires that we be associated with a job/queue entry
2035 assert task.queue_entry, "No HQE associated with provision task!"
2036 # task.queue_entry is an afe model HostQueueEntry object.
2037 # self.queue_entry is a scheduler models HostQueueEntry object, but
2038 # it gets constructed and assigned in __init__, so it's not available
2039 # yet. Therefore, we're stuck pulling labels off of the afe model
2040 # so that we can pass the --provision args into the __init__ call.
2041 labels = {x.name for x in task.queue_entry.job.dependency_labels.all()}
2042 _, provisionable = provision.filter_labels(labels)
2043 extra_command_args = ['--provision', ','.join(provisionable)]
2044 super(ProvisionTask, self).__init__(task, extra_command_args)
2045 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2046
2047
2048 def _command_line(self):
2049 # If we give queue_entry to _autoserv_command_line, then it will append
2050 # -c for this invocation if the queue_entry is a client side test. We
2051 # don't want that, as it messes with provisioning, so we just drop it
2052 # from the arguments here.
2053 # Note that we also don't verify job_repo_url as provisioining tasks are
2054 # required to stage whatever content we need, and the job itself will
2055 # force autotest to be staged if it isn't already.
2056 return _autoserv_command_line(self.host.hostname,
2057 self._extra_command_args)
2058
2059
2060 def prolog(self):
2061 super(ProvisionTask, self).prolog()
2062 # add check for previous provision task and abort if exist.
2063 logging.info("starting provision task for host: %s", self.host.hostname)
2064 self.queue_entry.set_status(
2065 models.HostQueueEntry.Status.PROVISIONING)
2066 self.host.set_status(models.Host.Status.PROVISIONING)
2067
2068
2069 def epilog(self):
2070 # TODO(milleral) Here, we override the PreJobTask's epilog, because
2071 # it's written with the idea that pre-job special task failures are a
2072 # problem with the host and not with something about the HQE.
2073 # In our case, the HQE's DEPENDENCIES specify what the provision task
2074 # does, so if the provision fails, it can be the fault of the HQE, and
2075 # thus we fail the HQE. This difference is handled only here for now,
2076 # but some refactoring of PreJobTask should likely happen sometime in
2077 # the future?
Alex Millere8949c92013-08-07 17:10:15 -07002078 # This call is needed to log the status and call into self.cleanup(),
2079 # which is PreJobTasks's cleanup, which marks is_complete=1.
2080 AgentTask.epilog(self)
Alex Millerdfff2fd2013-05-28 13:05:06 -07002081
2082 if not self.success:
2083 # TODO(milleral) http://crbug.com/231452
2084 # In our own setup, we don't really use the results
2085 # repository, so I *think* this call can be elided. However, I'd
2086 # like to limit what I can possibly break for now, and it would be
2087 # called if I called PreJobTask's epilog, so I'm keeping the call
2088 # to it for now.
2089 self._copy_to_results_repository()
2090 # _actually_fail_queue_entry() is a hack around the fact that we do
2091 # indeed want to abort the queue entry here, but the rest of the
2092 # scheduler code expects that we will reschedule onto some other
2093 # host.
2094 self._actually_fail_queue_entry()
2095 # This abort will mark the aborted bit on the HQE itself, to
2096 # signify that we're killing it. Technically it also will do
2097 # the recursive aborting of all child jobs, but that shouldn't
2098 # matter here, as only suites have children, and those
2099 # are hostless and thus don't have provisioning.
Alex Millerf314b262013-08-07 22:54:10 -07002100 # TODO(milleral) http://crbug.com/188217
2101 # However, we can't actually do this yet, as if we set the abort bit
2102 # the FinalReparseTask will set the status of the HQE to ABORTED,
2103 # which then means that we don't show the status in run_suite.
2104 # So in the meantime, don't mark the HQE as aborted.
Alex Millerdfff2fd2013-05-28 13:05:06 -07002105 queue_entry = models.HostQueueEntry.objects.get(
2106 id=self.queue_entry.id)
Alex Millerf314b262013-08-07 22:54:10 -07002107 # queue_entry.abort()
2108
Alex Millerdfff2fd2013-05-28 13:05:06 -07002109 # The machine is in some totally unknown state, so let's kick off
2110 # a repair task to get it back to some known sane state.
2111 models.SpecialTask.objects.create(
2112 host=models.Host.objects.get(id=self.host.id),
2113 task=models.SpecialTask.Task.REPAIR,
2114 queue_entry=queue_entry,
2115 requested_by=self.task.requested_by)
2116 elif self._should_pending():
2117 self.queue_entry.on_pending()
2118 else:
2119 self.host.set_status(models.Host.Status.READY)
2120
2121
showarda9545c02009-12-18 22:44:26 +00002122class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
2123 """
2124 Common functionality for QueueTask and HostlessQueueTask
2125 """
2126 def __init__(self, queue_entries):
2127 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00002128 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00002129 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00002130
2131
showard73ec0442009-02-07 02:05:20 +00002132 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00002133 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00002134
2135
jamesrenc44ae992010-02-19 00:12:54 +00002136 def _write_control_file(self, execution_path):
2137 control_path = _drone_manager.attach_file_to_execution(
2138 execution_path, self.job.control_file)
2139 return control_path
2140
2141
Aviv Keshet308e7362013-05-21 14:43:16 -07002142 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00002143 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00002144 execution_path = self.queue_entries[0].execution_path()
2145 control_path = self._write_control_file(execution_path)
2146 hostnames = ','.join(entry.host.hostname
2147 for entry in self.queue_entries
2148 if not entry.is_hostless())
2149
2150 execution_tag = self.queue_entries[0].execution_tag()
2151 params = _autoserv_command_line(
2152 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07002153 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00002154 _drone_manager.absolute_path(control_path)],
2155 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07002156 if self.job.is_image_update_job():
2157 params += ['--image', self.job.update_image_path]
2158
jamesrenc44ae992010-02-19 00:12:54 +00002159 return params
showardd1195652009-12-08 22:21:02 +00002160
2161
2162 @property
2163 def num_processes(self):
2164 return len(self.queue_entries)
2165
2166
2167 @property
2168 def owner_username(self):
2169 return self.job.owner
2170
2171
2172 def _working_directory(self):
2173 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002174
2175
jadmanski0afbb632008-06-06 21:10:57 +00002176 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002177 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00002178 keyval_dict = self.job.keyval_dict()
2179 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00002180 group_name = self.queue_entries[0].get_group_name()
2181 if group_name:
2182 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002183 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002184 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002185 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002186 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002187
2188
showard35162b02009-03-03 02:17:30 +00002189 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002190 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002191 _drone_manager.write_lines_to_file(error_file_path,
2192 [_LOST_PROCESS_ERROR])
2193
2194
showardd3dc1992009-04-22 21:01:40 +00002195 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002196 if not self.monitor:
2197 return
2198
showardd9205182009-04-27 20:09:55 +00002199 self._write_job_finished()
2200
showard35162b02009-03-03 02:17:30 +00002201 if self.monitor.lost_process:
2202 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002203
jadmanskif7fa2cc2008-10-01 14:13:23 +00002204
showardcbd74612008-11-19 21:42:02 +00002205 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002206 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002207 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002208 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002209 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002210
2211
jadmanskif7fa2cc2008-10-01 14:13:23 +00002212 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002213 if not self.monitor or not self.monitor.has_process():
2214 return
2215
jadmanskif7fa2cc2008-10-01 14:13:23 +00002216 # build up sets of all the aborted_by and aborted_on values
2217 aborted_by, aborted_on = set(), set()
2218 for queue_entry in self.queue_entries:
2219 if queue_entry.aborted_by:
2220 aborted_by.add(queue_entry.aborted_by)
2221 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2222 aborted_on.add(t)
2223
2224 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00002225 # TODO(showard): this conditional is now obsolete, we just need to leave
2226 # it in temporarily for backwards compatibility over upgrades. delete
2227 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00002228 assert len(aborted_by) <= 1
2229 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002230 aborted_by_value = aborted_by.pop()
2231 aborted_on_value = max(aborted_on)
2232 else:
2233 aborted_by_value = 'autotest_system'
2234 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002235
showarda0382352009-02-11 23:36:43 +00002236 self._write_keyval_after_job("aborted_by", aborted_by_value)
2237 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002238
showardcbd74612008-11-19 21:42:02 +00002239 aborted_on_string = str(datetime.datetime.fromtimestamp(
2240 aborted_on_value))
2241 self._write_status_comment('Job aborted by %s on %s' %
2242 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002243
2244
jadmanski0afbb632008-06-06 21:10:57 +00002245 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002246 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002247 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002248 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002249
2250
jadmanski0afbb632008-06-06 21:10:57 +00002251 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002252 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002253 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002254
2255
2256class QueueTask(AbstractQueueTask):
2257 def __init__(self, queue_entries):
2258 super(QueueTask, self).__init__(queue_entries)
2259 self._set_ids(queue_entries=queue_entries)
2260
2261
2262 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002263 self._check_queue_entry_statuses(
2264 self.queue_entries,
2265 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2266 models.HostQueueEntry.Status.RUNNING),
2267 allowed_host_statuses=(models.Host.Status.PENDING,
2268 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002269
2270 super(QueueTask, self).prolog()
2271
2272 for queue_entry in self.queue_entries:
2273 self._write_host_keyvals(queue_entry.host)
2274 queue_entry.host.set_status(models.Host.Status.RUNNING)
2275 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00002276
2277
2278 def _finish_task(self):
2279 super(QueueTask, self)._finish_task()
2280
2281 for queue_entry in self.queue_entries:
2282 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00002283 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00002284
2285
Alex Miller9f01d5d2013-08-08 02:26:01 -07002286 def _command_line(self):
2287 invocation = super(QueueTask, self)._command_line()
2288 return invocation + ['--verify_job_repo_url']
2289
2290
mbligh4608b002010-01-05 18:22:35 +00002291class HostlessQueueTask(AbstractQueueTask):
2292 def __init__(self, queue_entry):
2293 super(HostlessQueueTask, self).__init__([queue_entry])
2294 self.queue_entry_ids = [queue_entry.id]
2295
2296
2297 def prolog(self):
2298 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2299 super(HostlessQueueTask, self).prolog()
2300
2301
mbligh4608b002010-01-05 18:22:35 +00002302 def _finish_task(self):
2303 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002304 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002305
2306
showardd3dc1992009-04-22 21:01:40 +00002307class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002308 def __init__(self, queue_entries, log_file_name):
2309 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002310
showardd1195652009-12-08 22:21:02 +00002311 self.queue_entries = queue_entries
2312
showardd3dc1992009-04-22 21:01:40 +00002313 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002314 self._autoserv_monitor.attach_to_existing_process(
2315 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002316
showardd1195652009-12-08 22:21:02 +00002317
2318 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002319 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002320 return 'true'
2321 return self._generate_command(
2322 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002323
2324
2325 def _generate_command(self, results_dir):
2326 raise NotImplementedError('Subclasses must override this')
2327
2328
showardd1195652009-12-08 22:21:02 +00002329 @property
2330 def owner_username(self):
2331 return self.queue_entries[0].job.owner
2332
2333
2334 def _working_directory(self):
2335 return self._get_consistent_execution_path(self.queue_entries)
2336
2337
2338 def _paired_with_monitor(self):
2339 return self._autoserv_monitor
2340
2341
showardd3dc1992009-04-22 21:01:40 +00002342 def _job_was_aborted(self):
2343 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002344 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002345 queue_entry.update_from_database()
2346 if was_aborted is None: # first queue entry
2347 was_aborted = bool(queue_entry.aborted)
2348 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00002349 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
2350 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00002351 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00002352 'Inconsistent abort state',
2353 'Queue entries have inconsistent abort state:\n' +
2354 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00002355 # don't crash here, just assume true
2356 return True
2357 return was_aborted
2358
2359
showardd1195652009-12-08 22:21:02 +00002360 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002361 if self._job_was_aborted():
2362 return models.HostQueueEntry.Status.ABORTED
2363
2364 # we'll use a PidfileRunMonitor to read the autoserv exit status
2365 if self._autoserv_monitor.exit_code() == 0:
2366 return models.HostQueueEntry.Status.COMPLETED
2367 return models.HostQueueEntry.Status.FAILED
2368
2369
showardd3dc1992009-04-22 21:01:40 +00002370 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002371 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002372 queue_entry.set_status(status)
2373
2374
2375 def abort(self):
2376 # override AgentTask.abort() to avoid killing the process and ending
2377 # the task. post-job tasks continue when the job is aborted.
2378 pass
2379
2380
mbligh4608b002010-01-05 18:22:35 +00002381 def _pidfile_label(self):
2382 # '.autoserv_execute' -> 'autoserv'
2383 return self._pidfile_name()[1:-len('_execute')]
2384
2385
showard9bb960b2009-11-19 01:02:11 +00002386class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002387 """
2388 Task responsible for
2389 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2390 * copying logs to the results repository
2391 * spawning CleanupTasks for hosts, if necessary
2392 * spawning a FinalReparseTask for the job
2393 """
showardd1195652009-12-08 22:21:02 +00002394 def __init__(self, queue_entries, recover_run_monitor=None):
2395 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002396 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002397 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002398 self._set_ids(queue_entries=queue_entries)
2399
2400
Aviv Keshet308e7362013-05-21 14:43:16 -07002401 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd3dc1992009-04-22 21:01:40 +00002402 def _generate_command(self, results_dir):
2403 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002404 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002405 return [_autoserv_path , '-p',
2406 '--pidfile-label=%s' % self._pidfile_label(),
2407 '--use-existing-results', '--collect-crashinfo',
2408 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002409
2410
showardd1195652009-12-08 22:21:02 +00002411 @property
2412 def num_processes(self):
2413 return len(self.queue_entries)
2414
2415
2416 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002417 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002418
2419
showardd3dc1992009-04-22 21:01:40 +00002420 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002421 self._check_queue_entry_statuses(
2422 self.queue_entries,
2423 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2424 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002425
showardd3dc1992009-04-22 21:01:40 +00002426 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002427
2428
showardd3dc1992009-04-22 21:01:40 +00002429 def epilog(self):
2430 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002431 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002432 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002433
showard9bb960b2009-11-19 01:02:11 +00002434
2435 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002436 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002437 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002438 models.HostQueueEntry.Status.COMPLETED)
2439 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2440 else:
2441 final_success = False
2442 num_tests_failed = 0
showard9bb960b2009-11-19 01:02:11 +00002443 reboot_after = self._job.reboot_after
2444 do_reboot = (
2445 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002446 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002447 or reboot_after == model_attributes.RebootAfter.ALWAYS
2448 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
Dan Shi07e09af2013-04-12 09:31:29 -07002449 and final_success and num_tests_failed == 0)
2450 or num_tests_failed > 0)
showard9bb960b2009-11-19 01:02:11 +00002451
showardd1195652009-12-08 22:21:02 +00002452 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002453 if do_reboot:
2454 # don't pass the queue entry to the CleanupTask. if the cleanup
2455 # fails, the job doesn't care -- it's over.
2456 models.SpecialTask.objects.create(
2457 host=models.Host.objects.get(id=queue_entry.host.id),
2458 task=models.SpecialTask.Task.CLEANUP,
2459 requested_by=self._job.owner_model())
2460 else:
2461 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002462
2463
showard0bbfc212009-04-29 21:06:13 +00002464 def run(self):
showard597bfd32009-05-08 18:22:50 +00002465 autoserv_exit_code = self._autoserv_monitor.exit_code()
2466 # only run if Autoserv exited due to some signal. if we have no exit
2467 # code, assume something bad (and signal-like) happened.
2468 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002469 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002470 else:
2471 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002472
2473
mbligh4608b002010-01-05 18:22:35 +00002474class SelfThrottledPostJobTask(PostJobTask):
2475 """
2476 Special AgentTask subclass that maintains its own global process limit.
2477 """
2478 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002479
2480
mbligh4608b002010-01-05 18:22:35 +00002481 @classmethod
2482 def _increment_running_processes(cls):
2483 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002484
mblighd5c95802008-03-05 00:33:46 +00002485
mbligh4608b002010-01-05 18:22:35 +00002486 @classmethod
2487 def _decrement_running_processes(cls):
2488 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002489
2490
mbligh4608b002010-01-05 18:22:35 +00002491 @classmethod
2492 def _max_processes(cls):
2493 raise NotImplementedError
2494
2495
2496 @classmethod
2497 def _can_run_new_process(cls):
2498 return cls._num_running_processes < cls._max_processes()
2499
2500
2501 def _process_started(self):
2502 return bool(self.monitor)
2503
2504
2505 def tick(self):
2506 # override tick to keep trying to start until the process count goes
2507 # down and we can, at which point we revert to default behavior
2508 if self._process_started():
2509 super(SelfThrottledPostJobTask, self).tick()
2510 else:
2511 self._try_starting_process()
2512
2513
2514 def run(self):
2515 # override run() to not actually run unless we can
2516 self._try_starting_process()
2517
2518
2519 def _try_starting_process(self):
2520 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002521 return
2522
mbligh4608b002010-01-05 18:22:35 +00002523 # actually run the command
2524 super(SelfThrottledPostJobTask, self).run()
jamesren25663562010-04-27 18:00:55 +00002525 if self._process_started():
2526 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002527
mblighd5c95802008-03-05 00:33:46 +00002528
mbligh4608b002010-01-05 18:22:35 +00002529 def finished(self, success):
2530 super(SelfThrottledPostJobTask, self).finished(success)
2531 if self._process_started():
2532 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002533
showard21baa452008-10-21 00:08:39 +00002534
mbligh4608b002010-01-05 18:22:35 +00002535class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002536 def __init__(self, queue_entries):
2537 super(FinalReparseTask, self).__init__(queue_entries,
2538 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002539 # don't use _set_ids, since we don't want to set the host_ids
2540 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002541
2542
2543 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002544 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002545 results_dir]
2546
2547
2548 @property
2549 def num_processes(self):
2550 return 0 # don't include parser processes in accounting
2551
2552
2553 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002554 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002555
2556
showard97aed502008-11-04 02:01:24 +00002557 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002558 def _max_processes(cls):
2559 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002560
2561
2562 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002563 self._check_queue_entry_statuses(
2564 self.queue_entries,
2565 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002566
showard97aed502008-11-04 02:01:24 +00002567 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002568
2569
2570 def epilog(self):
2571 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002572 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002573
2574
mbligh4608b002010-01-05 18:22:35 +00002575class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002576 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2577
mbligh4608b002010-01-05 18:22:35 +00002578 def __init__(self, queue_entries):
2579 super(ArchiveResultsTask, self).__init__(queue_entries,
2580 log_file_name='.archiving.log')
2581 # don't use _set_ids, since we don't want to set the host_ids
2582 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002583
2584
mbligh4608b002010-01-05 18:22:35 +00002585 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002586 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002587
2588
Aviv Keshet308e7362013-05-21 14:43:16 -07002589 # TODO: Refactor into autoserv_utils. crbug.com/243090
mbligh4608b002010-01-05 18:22:35 +00002590 def _generate_command(self, results_dir):
2591 return [_autoserv_path , '-p',
2592 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002593 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002594 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2595 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002596
2597
mbligh4608b002010-01-05 18:22:35 +00002598 @classmethod
2599 def _max_processes(cls):
2600 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002601
2602
2603 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002604 self._check_queue_entry_statuses(
2605 self.queue_entries,
2606 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2607
2608 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002609
2610
mbligh4608b002010-01-05 18:22:35 +00002611 def epilog(self):
2612 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002613 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002614 failed_file = os.path.join(self._working_directory(),
2615 self._ARCHIVING_FAILED_FILE)
2616 paired_process = self._paired_with_monitor().get_process()
2617 _drone_manager.write_lines_to_file(
2618 failed_file, ['Archiving failed with exit code %s'
2619 % self.monitor.exit_code()],
2620 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002621 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002622
2623
mbligh36768f02008-02-22 18:28:33 +00002624if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002625 main()