blob: 181aa44d9b26a026216d5cc14d325ebcea2eeb92 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
mbligh36768f02008-02-22 18:28:33 +00008
Aviv Keshet225bdfe2013-03-05 10:10:08 -08009import datetime, optparse, os, signal
10import sys, time, traceback, urllib
11import logging, gc
showard402934a2009-12-21 22:20:47 +000012
Alex Miller05d7b4c2013-03-04 07:49:38 -080013import common
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000015
16import django.db
17
showard136e6dc2009-06-10 19:38:49 +000018from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000019from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000020from autotest_lib.database import database_connection
jamesrendd855242010-03-02 22:23:44 +000021from autotest_lib.frontend.afe import model_attributes
Alex Miller05d7b4c2013-03-04 07:49:38 -080022from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000023from autotest_lib.scheduler import drone_manager, drones, email_manager
Dale Curtisaa513362011-03-01 17:27:44 -080024from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
Alex Miller05d7b4c2013-03-04 07:49:38 -080025from autotest_lib.scheduler import scheduler_logging_config
jamesrenc44ae992010-02-19 00:12:54 +000026from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080027from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070028from autotest_lib.server import autoserv_utils
Alex Millerdfff2fd2013-05-28 13:05:06 -070029from autotest_lib.server.cros import provision
Fang Deng1d6c2a02013-04-17 15:25:45 -070030from autotest_lib.site_utils.graphite import stats
Alex Miller05d7b4c2013-03-04 07:49:38 -080031
showard549afad2009-08-20 23:33:36 +000032BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
33PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000034
mbligh36768f02008-02-22 18:28:33 +000035RESULTS_DIR = '.'
36AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000037DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000038AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
39
40if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000041 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000042AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
43AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
44
45if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000046 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000047
showard35162b02009-03-03 02:17:30 +000048# error message to leave in results dir when an autoserv process disappears
49# mysteriously
50_LOST_PROCESS_ERROR = """\
51Autoserv failed abnormally during execution for this job, probably due to a
52system error on the Autotest server. Full results may not be available. Sorry.
53"""
54
mbligh6f8bab42008-02-29 22:45:14 +000055_db = None
mbligh36768f02008-02-22 18:28:33 +000056_shutdown = False
Aviv Keshet308e7362013-05-21 14:43:16 -070057_autoserv_directory = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server')
58_autoserv_path = os.path.join(_autoserv_directory, 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000059_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000060_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000061
Eric Lie0493a42010-11-15 13:05:43 -080062def _parser_path_default(install_dir):
63 return os.path.join(install_dir, 'tko', 'parse')
64_parser_path_func = utils.import_site_function(
65 __file__, 'autotest_lib.scheduler.site_monitor_db',
66 'parser_path', _parser_path_default)
67_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
68
mbligh36768f02008-02-22 18:28:33 +000069
showardec6a3b92009-09-25 20:29:13 +000070def _get_pidfile_timeout_secs():
71 """@returns How long to wait for autoserv to write pidfile."""
72 pidfile_timeout_mins = global_config.global_config.get_config_value(
73 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
74 return pidfile_timeout_mins * 60
75
76
mbligh83c1e9e2009-05-01 23:10:41 +000077def _site_init_monitor_db_dummy():
78 return {}
79
80
jamesren76fcf192010-04-21 20:39:50 +000081def _verify_default_drone_set_exists():
82 if (models.DroneSet.drone_sets_enabled() and
83 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080084 raise host_scheduler.SchedulerError(
85 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000086
87
88def _sanity_check():
89 """Make sure the configs are consistent before starting the scheduler"""
90 _verify_default_drone_set_exists()
91
92
mbligh36768f02008-02-22 18:28:33 +000093def main():
showard27f33872009-04-07 18:20:53 +000094 try:
showard549afad2009-08-20 23:33:36 +000095 try:
96 main_without_exception_handling()
97 except SystemExit:
98 raise
99 except:
100 logging.exception('Exception escaping in monitor_db')
101 raise
102 finally:
103 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000104
105
106def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000107 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000108
showard136e6dc2009-06-10 19:38:49 +0000109 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000110 parser = optparse.OptionParser(usage)
111 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
112 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000113 parser.add_option('--test', help='Indicate that scheduler is under ' +
114 'test and should use dummy autoserv and no parsing',
115 action='store_true')
116 (options, args) = parser.parse_args()
117 if len(args) != 1:
118 parser.print_usage()
119 return
mbligh36768f02008-02-22 18:28:33 +0000120
showard5613c662009-06-08 23:30:33 +0000121 scheduler_enabled = global_config.global_config.get_config_value(
122 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
123
124 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800125 logging.error("Scheduler not enabled, set enable_scheduler to true in "
126 "the global_config's SCHEDULER section to enable it. "
127 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000128 sys.exit(1)
129
jadmanski0afbb632008-06-06 21:10:57 +0000130 global RESULTS_DIR
131 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000132
mbligh83c1e9e2009-05-01 23:10:41 +0000133 site_init = utils.import_site_function(__file__,
134 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
135 _site_init_monitor_db_dummy)
136 site_init()
137
showardcca334f2009-03-12 20:38:34 +0000138 # Change the cwd while running to avoid issues incase we were launched from
139 # somewhere odd (such as a random NFS home directory of the person running
140 # sudo to launch us as the appropriate user).
141 os.chdir(RESULTS_DIR)
142
jamesrenc7d387e2010-08-10 21:48:30 +0000143 # This is helpful for debugging why stuff a scheduler launches is
144 # misbehaving.
145 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000146
jadmanski0afbb632008-06-06 21:10:57 +0000147 if options.test:
148 global _autoserv_path
149 _autoserv_path = 'autoserv_dummy'
150 global _testing_mode
151 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000152
jamesrenc44ae992010-02-19 00:12:54 +0000153 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000154 server.start()
155
jadmanski0afbb632008-06-06 21:10:57 +0000156 try:
jamesrenc44ae992010-02-19 00:12:54 +0000157 initialize()
showardc5afc462009-01-13 00:09:39 +0000158 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000159 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000160
Eric Lia82dc352011-02-23 13:15:52 -0800161 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000162 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000163 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000164 except:
showard170873e2009-01-07 00:22:26 +0000165 email_manager.manager.log_stacktrace(
166 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000167
showard170873e2009-01-07 00:22:26 +0000168 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000169 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000170 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000171 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000172
173
showard136e6dc2009-06-10 19:38:49 +0000174def setup_logging():
175 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
176 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
177 logging_manager.configure_logging(
178 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
179 logfile_name=log_name)
180
181
mbligh36768f02008-02-22 18:28:33 +0000182def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000183 global _shutdown
184 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000185 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000186
187
jamesrenc44ae992010-02-19 00:12:54 +0000188def initialize():
showardb18134f2009-03-20 20:52:18 +0000189 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
190 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000191
showard8de37132009-08-31 18:33:08 +0000192 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000193 logging.critical("monitor_db already running, aborting!")
194 sys.exit(1)
195 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000196
showardb1e51872008-10-07 11:08:18 +0000197 if _testing_mode:
198 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000199 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000200
jadmanski0afbb632008-06-06 21:10:57 +0000201 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
202 global _db
showard170873e2009-01-07 00:22:26 +0000203 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000204 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000205
showardfa8629c2008-11-04 16:51:23 +0000206 # ensure Django connection is in autocommit
207 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000208 # bypass the readonly connection
209 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000210
showardb18134f2009-03-20 20:52:18 +0000211 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000212 signal.signal(signal.SIGINT, handle_sigint)
213
jamesrenc44ae992010-02-19 00:12:54 +0000214 initialize_globals()
215 scheduler_models.initialize()
216
showardd1ee1dd2009-01-07 21:33:08 +0000217 drones = global_config.global_config.get_config_value(
218 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
219 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000220 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000221 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000222 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
223
showardb18134f2009-03-20 20:52:18 +0000224 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000225
226
jamesrenc44ae992010-02-19 00:12:54 +0000227def initialize_globals():
228 global _drone_manager
229 _drone_manager = drone_manager.instance()
230
231
showarded2afea2009-07-07 20:54:07 +0000232def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
233 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000234 """
235 @returns The autoserv command line as a list of executable + parameters.
236
237 @param machines - string - A machine or comma separated list of machines
238 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000239 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700240 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
241 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000242 @param queue_entry - A HostQueueEntry object - If supplied and no Job
243 object was supplied, this will be used to lookup the Job object.
244 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700245 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
246 machines, results_directory=drone_manager.WORKING_DIRECTORY,
247 extra_args=extra_args, job=job, queue_entry=queue_entry,
248 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000249
250
Simran Basia858a232012-08-21 11:04:37 -0700251class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800252
253
jadmanski0afbb632008-06-06 21:10:57 +0000254 def __init__(self):
255 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000256 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800257 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000258 user_cleanup_time = scheduler_config.config.clean_interval
259 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
260 _db, user_cleanup_time)
261 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000262 self._host_agents = {}
263 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000264 self._tick_count = 0
265 self._last_garbage_stats_time = time.time()
266 self._seconds_between_garbage_stats = 60 * (
267 global_config.global_config.get_config_value(
268 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700269 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700270 self._tick_debug = global_config.global_config.get_config_value(
271 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
272 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700273 self._extra_debugging = global_config.global_config.get_config_value(
274 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
275 default=False)
mbligh36768f02008-02-22 18:28:33 +0000276
mbligh36768f02008-02-22 18:28:33 +0000277
showard915958d2009-04-22 21:00:58 +0000278 def initialize(self, recover_hosts=True):
279 self._periodic_cleanup.initialize()
280 self._24hr_upkeep.initialize()
281
jadmanski0afbb632008-06-06 21:10:57 +0000282 # always recover processes
283 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000284
jadmanski0afbb632008-06-06 21:10:57 +0000285 if recover_hosts:
286 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000287
jamesrenc44ae992010-02-19 00:12:54 +0000288 self._host_scheduler.recovery_on_startup()
289
mbligh36768f02008-02-22 18:28:33 +0000290
Simran Basi0ec94dd2012-08-28 09:50:10 -0700291 def _log_tick_msg(self, msg):
292 if self._tick_debug:
293 logging.debug(msg)
294
295
Simran Basidef92872012-09-20 13:34:34 -0700296 def _log_extra_msg(self, msg):
297 if self._extra_debugging:
298 logging.debug(msg)
299
300
jadmanski0afbb632008-06-06 21:10:57 +0000301 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700302 """
303 This is an altered version of tick() where we keep track of when each
304 major step begins so we can try to figure out where we are using most
305 of the tick time.
306 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700307 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700308 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000309 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700310 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000311 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700312 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000313 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700314 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000315 self._find_aborting()
Simran Basi3f6717d2012-09-13 15:21:22 -0700316 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000317 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700318 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000319 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700320 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000321 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700322 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000323 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700324 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000325 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700326 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000327 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700328 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000329 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700330 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000331 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700332 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700333 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700334 with timer.get_client('email_manager_send_queued_emails'):
335 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700336 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700337 with timer.get_client('django_db_reset_queries'):
338 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000339 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000340
showard97aed502008-11-04 02:01:24 +0000341
mblighf3294cc2009-04-08 21:17:38 +0000342 def _run_cleanup(self):
343 self._periodic_cleanup.run_cleanup_maybe()
344 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000345
mbligh36768f02008-02-22 18:28:33 +0000346
showardf13a9e22009-12-18 22:54:09 +0000347 def _garbage_collection(self):
348 threshold_time = time.time() - self._seconds_between_garbage_stats
349 if threshold_time < self._last_garbage_stats_time:
350 # Don't generate these reports very often.
351 return
352
353 self._last_garbage_stats_time = time.time()
354 # Force a full level 0 collection (because we can, it doesn't hurt
355 # at this interval).
356 gc.collect()
357 logging.info('Logging garbage collector stats on tick %d.',
358 self._tick_count)
359 gc_stats._log_garbage_collector_stats()
360
361
showard170873e2009-01-07 00:22:26 +0000362 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
363 for object_id in object_ids:
364 agent_dict.setdefault(object_id, set()).add(agent)
365
366
367 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
368 for object_id in object_ids:
369 assert object_id in agent_dict
370 agent_dict[object_id].remove(agent)
371
372
showardd1195652009-12-08 22:21:02 +0000373 def add_agent_task(self, agent_task):
374 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000375 self._agents.append(agent)
376 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000377 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
378 self._register_agent_for_ids(self._queue_entry_agents,
379 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000380
showard170873e2009-01-07 00:22:26 +0000381
382 def get_agents_for_entry(self, queue_entry):
383 """
384 Find agents corresponding to the specified queue_entry.
385 """
showardd3dc1992009-04-22 21:01:40 +0000386 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000387
388
389 def host_has_agent(self, host):
390 """
391 Determine if there is currently an Agent present using this host.
392 """
393 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000394
395
jadmanski0afbb632008-06-06 21:10:57 +0000396 def remove_agent(self, agent):
397 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000398 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
399 agent)
400 self._unregister_agent_for_ids(self._queue_entry_agents,
401 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000402
403
showard8cc058f2009-09-08 16:26:33 +0000404 def _host_has_scheduled_special_task(self, host):
405 return bool(models.SpecialTask.objects.filter(host__id=host.id,
406 is_active=False,
407 is_complete=False))
408
409
jadmanski0afbb632008-06-06 21:10:57 +0000410 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000411 agent_tasks = self._create_recovery_agent_tasks()
412 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000413 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000414 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000415 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000416 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000417 self._reverify_remaining_hosts()
418 # reinitialize drones after killing orphaned processes, since they can
419 # leave around files when they die
420 _drone_manager.execute_actions()
421 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000422
showard170873e2009-01-07 00:22:26 +0000423
showardd1195652009-12-08 22:21:02 +0000424 def _create_recovery_agent_tasks(self):
425 return (self._get_queue_entry_agent_tasks()
426 + self._get_special_task_agent_tasks(is_active=True))
427
428
429 def _get_queue_entry_agent_tasks(self):
430 # host queue entry statuses handled directly by AgentTasks (Verifying is
431 # handled through SpecialTasks, so is not listed here)
432 statuses = (models.HostQueueEntry.Status.STARTING,
433 models.HostQueueEntry.Status.RUNNING,
434 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000435 models.HostQueueEntry.Status.PARSING,
436 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000437 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000438 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000439 where='status IN (%s)' % status_list)
440
441 agent_tasks = []
442 used_queue_entries = set()
443 for entry in queue_entries:
444 if self.get_agents_for_entry(entry):
445 # already being handled
446 continue
447 if entry in used_queue_entries:
448 # already picked up by a synchronous job
449 continue
450 agent_task = self._get_agent_task_for_queue_entry(entry)
451 agent_tasks.append(agent_task)
452 used_queue_entries.update(agent_task.queue_entries)
453 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000454
455
showardd1195652009-12-08 22:21:02 +0000456 def _get_special_task_agent_tasks(self, is_active=False):
457 special_tasks = models.SpecialTask.objects.filter(
458 is_active=is_active, is_complete=False)
459 return [self._get_agent_task_for_special_task(task)
460 for task in special_tasks]
461
462
463 def _get_agent_task_for_queue_entry(self, queue_entry):
464 """
465 Construct an AgentTask instance for the given active HostQueueEntry,
466 if one can currently run it.
467 @param queue_entry: a HostQueueEntry
468 @returns an AgentTask to run the queue entry
469 """
470 task_entries = queue_entry.job.get_group_entries(queue_entry)
471 self._check_for_duplicate_host_entries(task_entries)
472
473 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
474 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000475 if queue_entry.is_hostless():
476 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000477 return QueueTask(queue_entries=task_entries)
478 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
479 return GatherLogsTask(queue_entries=task_entries)
480 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
481 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000482 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
483 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000484
Dale Curtisaa513362011-03-01 17:27:44 -0800485 raise host_scheduler.SchedulerError(
486 '_get_agent_task_for_queue_entry got entry with '
487 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000488
489
490 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000491 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
492 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000493 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000494 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000495 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000496 if using_host:
showardd1195652009-12-08 22:21:02 +0000497 self._assert_host_has_no_agent(task_entry)
498
499
500 def _assert_host_has_no_agent(self, entry):
501 """
502 @param entry: a HostQueueEntry or a SpecialTask
503 """
504 if self.host_has_agent(entry.host):
505 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800506 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000507 'While scheduling %s, host %s already has a host agent %s'
508 % (entry, entry.host, agent.task))
509
510
511 def _get_agent_task_for_special_task(self, special_task):
512 """
513 Construct an AgentTask class to run the given SpecialTask and add it
514 to this dispatcher.
515 @param special_task: a models.SpecialTask instance
516 @returns an AgentTask to run this SpecialTask
517 """
518 self._assert_host_has_no_agent(special_task)
519
Dan Shi07e09af2013-04-12 09:31:29 -0700520 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask,
Alex Millerdfff2fd2013-05-28 13:05:06 -0700521 ResetTask, ProvisionTask)
showardd1195652009-12-08 22:21:02 +0000522 for agent_task_class in special_agent_task_classes:
523 if agent_task_class.TASK_TYPE == special_task.task:
524 return agent_task_class(task=special_task)
525
Dale Curtisaa513362011-03-01 17:27:44 -0800526 raise host_scheduler.SchedulerError(
527 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000528
529
530 def _register_pidfiles(self, agent_tasks):
531 for agent_task in agent_tasks:
532 agent_task.register_necessary_pidfiles()
533
534
535 def _recover_tasks(self, agent_tasks):
536 orphans = _drone_manager.get_orphaned_autoserv_processes()
537
538 for agent_task in agent_tasks:
539 agent_task.recover()
540 if agent_task.monitor and agent_task.monitor.has_process():
541 orphans.discard(agent_task.monitor.get_process())
542 self.add_agent_task(agent_task)
543
544 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000545
546
showard8cc058f2009-09-08 16:26:33 +0000547 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000548 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
549 % status):
showard0db3d432009-10-12 20:29:15 +0000550 if entry.status == status and not self.get_agents_for_entry(entry):
551 # The status can change during iteration, e.g., if job.run()
552 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000553 yield entry
554
555
showard6878e8b2009-07-20 22:37:45 +0000556 def _check_for_remaining_orphan_processes(self, orphans):
557 if not orphans:
558 return
559 subject = 'Unrecovered orphan autoserv processes remain'
560 message = '\n'.join(str(process) for process in orphans)
561 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000562
563 die_on_orphans = global_config.global_config.get_config_value(
564 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
565
566 if die_on_orphans:
567 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000568
showard170873e2009-01-07 00:22:26 +0000569
showard8cc058f2009-09-08 16:26:33 +0000570 def _recover_pending_entries(self):
571 for entry in self._get_unassigned_entries(
572 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000573 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000574 entry.on_pending()
575
576
showardb8900452009-10-12 20:31:01 +0000577 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000578 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000579 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
580 unrecovered_hqes = []
581 for queue_entry in queue_entries:
582 special_tasks = models.SpecialTask.objects.filter(
583 task__in=(models.SpecialTask.Task.CLEANUP,
584 models.SpecialTask.Task.VERIFY),
585 queue_entry__id=queue_entry.id,
586 is_complete=False)
587 if special_tasks.count() == 0:
588 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000589
showardb8900452009-10-12 20:31:01 +0000590 if unrecovered_hqes:
591 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800592 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000593 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000594 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000595
596
showard65db3932009-10-28 19:54:35 +0000597 def _get_prioritized_special_tasks(self):
598 """
599 Returns all queued SpecialTasks prioritized for repair first, then
600 cleanup, then verify.
601 """
602 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
603 is_complete=False,
604 host__locked=False)
605 # exclude hosts with active queue entries unless the SpecialTask is for
606 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000607 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000608 queued_tasks, 'afe_host_queue_entries', 'host_id',
609 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000610 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000611 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000612 where=['(afe_host_queue_entries.id IS NULL OR '
613 'afe_host_queue_entries.id = '
614 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000615
showard65db3932009-10-28 19:54:35 +0000616 # reorder tasks by priority
617 task_priority_order = [models.SpecialTask.Task.REPAIR,
618 models.SpecialTask.Task.CLEANUP,
Dan Shi07e09af2013-04-12 09:31:29 -0700619 models.SpecialTask.Task.VERIFY,
Alex Millerdfff2fd2013-05-28 13:05:06 -0700620 models.SpecialTask.Task.RESET,
621 models.SpecialTask.Task.PROVISION]
showard65db3932009-10-28 19:54:35 +0000622 def task_priority_key(task):
623 return task_priority_order.index(task.task)
624 return sorted(queued_tasks, key=task_priority_key)
625
626
showard65db3932009-10-28 19:54:35 +0000627 def _schedule_special_tasks(self):
628 """
629 Execute queued SpecialTasks that are ready to run on idle hosts.
630 """
631 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000632 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000633 continue
showardd1195652009-12-08 22:21:02 +0000634 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000635
636
showard170873e2009-01-07 00:22:26 +0000637 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000638 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000639 # should never happen
showarded2afea2009-07-07 20:54:07 +0000640 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000641 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000642 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700643 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000644 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000645
646
jadmanski0afbb632008-06-06 21:10:57 +0000647 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000648 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700649 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000650 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000651 if self.host_has_agent(host):
652 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000653 continue
showard8cc058f2009-09-08 16:26:33 +0000654 if self._host_has_scheduled_special_task(host):
655 # host will have a special task scheduled on the next cycle
656 continue
showard170873e2009-01-07 00:22:26 +0000657 if print_message:
showardb18134f2009-03-20 20:52:18 +0000658 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000659 models.SpecialTask.objects.create(
660 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000661 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000662
663
jadmanski0afbb632008-06-06 21:10:57 +0000664 def _recover_hosts(self):
665 # recover "Repair Failed" hosts
666 message = 'Reverifying dead host %s'
667 self._reverify_hosts_where("status = 'Repair Failed'",
668 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000669
670
showard04c82c52008-05-29 19:38:12 +0000671
showardb95b1bd2008-08-15 18:11:04 +0000672 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000673 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +0000674 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000675 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000676 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +0000677 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000678
679
showard89f84db2009-03-12 20:39:13 +0000680 def _refresh_pending_queue_entries(self):
681 """
682 Lookup the pending HostQueueEntries and call our HostScheduler
683 refresh() method given that list. Return the list.
684
685 @returns A list of pending HostQueueEntries sorted in priority order.
686 """
showard63a34772008-08-18 19:32:50 +0000687 queue_entries = self._get_pending_queue_entries()
688 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000689 return []
showardb95b1bd2008-08-15 18:11:04 +0000690
showard63a34772008-08-18 19:32:50 +0000691 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000692
showard89f84db2009-03-12 20:39:13 +0000693 return queue_entries
694
695
696 def _schedule_atomic_group(self, queue_entry):
697 """
698 Schedule the given queue_entry on an atomic group of hosts.
699
700 Returns immediately if there are insufficient available hosts.
701
702 Creates new HostQueueEntries based off of queue_entry for the
703 scheduled hosts and starts them all running.
704 """
705 # This is a virtual host queue entry representing an entire
706 # atomic group, find a group and schedule their hosts.
707 group_hosts = self._host_scheduler.find_eligible_atomic_group(
708 queue_entry)
709 if not group_hosts:
710 return
showardcbe6f942009-06-17 19:33:49 +0000711
712 logging.info('Expanding atomic group entry %s with hosts %s',
713 queue_entry,
714 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000715
showard89f84db2009-03-12 20:39:13 +0000716 for assigned_host in group_hosts[1:]:
717 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000718 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000719 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000720 new_hqe.set_host(assigned_host)
721 self._run_queue_entry(new_hqe)
722
723 # The first assigned host uses the original HostQueueEntry
724 queue_entry.set_host(group_hosts[0])
725 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000726
727
showarda9545c02009-12-18 22:44:26 +0000728 def _schedule_hostless_job(self, queue_entry):
729 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000730 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000731
732
showard89f84db2009-03-12 20:39:13 +0000733 def _schedule_new_jobs(self):
734 queue_entries = self._refresh_pending_queue_entries()
735 if not queue_entries:
736 return
737
Simran Basi3f6717d2012-09-13 15:21:22 -0700738 logging.debug('Processing %d queue_entries', len(queue_entries))
showard63a34772008-08-18 19:32:50 +0000739 for queue_entry in queue_entries:
Simran Basidef92872012-09-20 13:34:34 -0700740 self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
showarde55955f2009-10-07 20:48:58 +0000741 is_unassigned_atomic_group = (
742 queue_entry.atomic_group_id is not None
743 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +0000744
745 if queue_entry.is_hostless():
Simran Basidef92872012-09-20 13:34:34 -0700746 self._log_extra_msg('Scheduling hostless job.')
showarda9545c02009-12-18 22:44:26 +0000747 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000748 elif is_unassigned_atomic_group:
749 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +0000750 else:
jamesren883492a2010-02-12 00:45:18 +0000751 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +0000752 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +0000753 assert assigned_host.id == queue_entry.host_id
754 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000755
756
showard8cc058f2009-09-08 16:26:33 +0000757 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +0000758 for agent_task in self._get_queue_entry_agent_tasks():
759 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000760
761
762 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000763 for entry in scheduler_models.HostQueueEntry.fetch(
764 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000765 task = entry.job.schedule_delayed_callback_task(entry)
766 if task:
showardd1195652009-12-08 22:21:02 +0000767 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000768
769
jamesren883492a2010-02-12 00:45:18 +0000770 def _run_queue_entry(self, queue_entry):
Simran Basidef92872012-09-20 13:34:34 -0700771 self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
772 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000773 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000774
775
jadmanski0afbb632008-06-06 21:10:57 +0000776 def _find_aborting(self):
jamesrene7c65cb2010-06-08 20:38:10 +0000777 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000778 for entry in scheduler_models.HostQueueEntry.fetch(
779 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +0000780 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +0000781 for agent in self.get_agents_for_entry(entry):
782 agent.abort()
783 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000784 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700785 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000786 for job in jobs_to_stop:
787 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000788
789
showard324bf812009-01-20 23:23:38 +0000790 def _can_start_agent(self, agent, num_started_this_cycle,
791 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000792 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000793 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000794 return True
795 # don't allow any nonzero-process agents to run after we've reached a
796 # limit (this avoids starvation of many-process agents)
797 if have_reached_limit:
798 return False
799 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000800 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000801 agent.task.owner_username,
802 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000803 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000804 return False
805 # if a single agent exceeds the per-cycle throttling, still allow it to
806 # run when it's the first agent in the cycle
807 if num_started_this_cycle == 0:
808 return True
809 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000810 if (num_started_this_cycle + agent.task.num_processes >
811 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000812 return False
813 return True
814
815
jadmanski0afbb632008-06-06 21:10:57 +0000816 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000817 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000818 have_reached_limit = False
819 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700820 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000821 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700822 self._log_extra_msg('Processing Agent with Host Ids: %s and '
823 'queue_entry ids:%s' % (agent.host_ids,
824 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000825 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000826 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000827 have_reached_limit):
828 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700829 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000830 continue
showardd1195652009-12-08 22:21:02 +0000831 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700832 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000833 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700834 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000835 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700836 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000837 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700838 logging.info('%d running processes. %d added this cycle.',
839 _drone_manager.total_running_processes(),
840 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +0000841
842
showard29f7cd22009-04-29 21:16:24 +0000843 def _process_recurring_runs(self):
844 recurring_runs = models.RecurringRun.objects.filter(
845 start_date__lte=datetime.datetime.now())
846 for rrun in recurring_runs:
847 # Create job from template
848 job = rrun.job
849 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000850 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000851
852 host_objects = info['hosts']
853 one_time_hosts = info['one_time_hosts']
854 metahost_objects = info['meta_hosts']
855 dependencies = info['dependencies']
856 atomic_group = info['atomic_group']
857
858 for host in one_time_hosts or []:
859 this_host = models.Host.create_one_time_host(host.hostname)
860 host_objects.append(this_host)
861
862 try:
863 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000864 options=options,
showard29f7cd22009-04-29 21:16:24 +0000865 host_objects=host_objects,
866 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +0000867 atomic_group=atomic_group)
868
869 except Exception, ex:
870 logging.exception(ex)
871 #TODO send email
872
873 if rrun.loop_count == 1:
874 rrun.delete()
875 else:
876 if rrun.loop_count != 0: # if not infinite loop
877 # calculate new start_date
878 difference = datetime.timedelta(seconds=rrun.loop_period)
879 rrun.start_date = rrun.start_date + difference
880 rrun.loop_count -= 1
881 rrun.save()
882
883
Simran Basia858a232012-08-21 11:04:37 -0700884SiteDispatcher = utils.import_site_class(
885 __file__, 'autotest_lib.scheduler.site_monitor_db',
886 'SiteDispatcher', BaseDispatcher)
887
888class Dispatcher(SiteDispatcher):
889 pass
890
891
showard170873e2009-01-07 00:22:26 +0000892class PidfileRunMonitor(object):
893 """
894 Client must call either run() to start a new process or
895 attach_to_existing_process().
896 """
mbligh36768f02008-02-22 18:28:33 +0000897
showard170873e2009-01-07 00:22:26 +0000898 class _PidfileException(Exception):
899 """
900 Raised when there's some unexpected behavior with the pid file, but only
901 used internally (never allowed to escape this class).
902 """
mbligh36768f02008-02-22 18:28:33 +0000903
904
showard170873e2009-01-07 00:22:26 +0000905 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000906 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000907 self._start_time = None
908 self.pidfile_id = None
909 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000910
911
showard170873e2009-01-07 00:22:26 +0000912 def _add_nice_command(self, command, nice_level):
913 if not nice_level:
914 return command
915 return ['nice', '-n', str(nice_level)] + command
916
917
918 def _set_start_time(self):
919 self._start_time = time.time()
920
921
showard418785b2009-11-23 20:19:59 +0000922 def run(self, command, working_directory, num_processes, nice_level=None,
923 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +0000924 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +0000925 assert command is not None
926 if nice_level is not None:
927 command = ['nice', '-n', str(nice_level)] + command
928 self._set_start_time()
929 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +0000930 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +0000931 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +0000932 paired_with_pidfile=paired_with_pidfile, username=username,
933 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +0000934
935
showarded2afea2009-07-07 20:54:07 +0000936 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +0000937 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +0000938 num_processes=None):
showard170873e2009-01-07 00:22:26 +0000939 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +0000940 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000941 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +0000942 if num_processes is not None:
943 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +0000944
945
jadmanski0afbb632008-06-06 21:10:57 +0000946 def kill(self):
showard170873e2009-01-07 00:22:26 +0000947 if self.has_process():
948 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000949
mbligh36768f02008-02-22 18:28:33 +0000950
showard170873e2009-01-07 00:22:26 +0000951 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000952 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000953 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000954
955
showard170873e2009-01-07 00:22:26 +0000956 def get_process(self):
showard21baa452008-10-21 00:08:39 +0000957 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +0000958 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +0000959 return self._state.process
mblighbb421852008-03-11 22:36:16 +0000960
961
showard170873e2009-01-07 00:22:26 +0000962 def _read_pidfile(self, use_second_read=False):
963 assert self.pidfile_id is not None, (
964 'You must call run() or attach_to_existing_process()')
965 contents = _drone_manager.get_pidfile_contents(
966 self.pidfile_id, use_second_read=use_second_read)
967 if contents.is_invalid():
968 self._state = drone_manager.PidfileContents()
969 raise self._PidfileException(contents)
970 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +0000971
972
showard21baa452008-10-21 00:08:39 +0000973 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +0000974 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
975 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +0000976 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +0000977 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +0000978
979
980 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +0000981 if self.lost_process:
showard21baa452008-10-21 00:08:39 +0000982 return
mblighbb421852008-03-11 22:36:16 +0000983
showard21baa452008-10-21 00:08:39 +0000984 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +0000985
showard170873e2009-01-07 00:22:26 +0000986 if self._state.process is None:
987 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +0000988 return
mbligh90a549d2008-03-25 23:52:34 +0000989
showard21baa452008-10-21 00:08:39 +0000990 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000991 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +0000992 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +0000993 return
mbligh90a549d2008-03-25 23:52:34 +0000994
showard170873e2009-01-07 00:22:26 +0000995 # pid but no running process - maybe process *just* exited
996 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +0000997 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000998 # autoserv exited without writing an exit code
999 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001000 self._handle_pidfile_error(
1001 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001002
showard21baa452008-10-21 00:08:39 +00001003
1004 def _get_pidfile_info(self):
1005 """\
1006 After completion, self._state will contain:
1007 pid=None, exit_status=None if autoserv has not yet run
1008 pid!=None, exit_status=None if autoserv is running
1009 pid!=None, exit_status!=None if autoserv has completed
1010 """
1011 try:
1012 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001013 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001014 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001015
1016
showard170873e2009-01-07 00:22:26 +00001017 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001018 """\
1019 Called when no pidfile is found or no pid is in the pidfile.
1020 """
showard170873e2009-01-07 00:22:26 +00001021 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001022 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001023 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001024 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001025 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001026
1027
showard35162b02009-03-03 02:17:30 +00001028 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001029 """\
1030 Called when autoserv has exited without writing an exit status,
1031 or we've timed out waiting for autoserv to write a pid to the
1032 pidfile. In either case, we just return failure and the caller
1033 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001034
showard170873e2009-01-07 00:22:26 +00001035 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001036 """
1037 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001038 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001039 self._state.exit_status = 1
1040 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001041
1042
jadmanski0afbb632008-06-06 21:10:57 +00001043 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001044 self._get_pidfile_info()
1045 return self._state.exit_status
1046
1047
1048 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001049 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001050 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001051 if self._state.num_tests_failed is None:
1052 return -1
showard21baa452008-10-21 00:08:39 +00001053 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001054
1055
showardcdaeae82009-08-31 18:32:48 +00001056 def try_copy_results_on_drone(self, **kwargs):
1057 if self.has_process():
1058 # copy results logs into the normal place for job results
1059 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1060
1061
1062 def try_copy_to_results_repository(self, source, **kwargs):
1063 if self.has_process():
1064 _drone_manager.copy_to_results_repository(self.get_process(),
1065 source, **kwargs)
1066
1067
mbligh36768f02008-02-22 18:28:33 +00001068class Agent(object):
showard77182562009-06-10 00:16:05 +00001069 """
Alex Miller47715eb2013-07-24 03:34:01 -07001070 An agent for use by the Dispatcher class to perform a task. An agent wraps
1071 around an AgentTask mainly to associate the AgentTask with the queue_entry
1072 and host ids.
showard77182562009-06-10 00:16:05 +00001073
1074 The following methods are required on all task objects:
1075 poll() - Called periodically to let the task check its status and
1076 update its internal state. If the task succeeded.
1077 is_done() - Returns True if the task is finished.
1078 abort() - Called when an abort has been requested. The task must
1079 set its aborted attribute to True if it actually aborted.
1080
1081 The following attributes are required on all task objects:
1082 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001083 success - bool, True if this task succeeded.
1084 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1085 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001086 """
1087
1088
showard418785b2009-11-23 20:19:59 +00001089 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001090 """
Alex Miller47715eb2013-07-24 03:34:01 -07001091 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001092 """
showard8cc058f2009-09-08 16:26:33 +00001093 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001094
showard77182562009-06-10 00:16:05 +00001095 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001096 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001097
showard8cc058f2009-09-08 16:26:33 +00001098 self.queue_entry_ids = task.queue_entry_ids
1099 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001100
showard8cc058f2009-09-08 16:26:33 +00001101 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001102 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001103
1104
jadmanski0afbb632008-06-06 21:10:57 +00001105 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001106 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001107 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001108 self.task.poll()
1109 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001110 self.finished = True
showardec113162008-05-08 00:52:49 +00001111
1112
jadmanski0afbb632008-06-06 21:10:57 +00001113 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001114 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001115
1116
showardd3dc1992009-04-22 21:01:40 +00001117 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001118 if self.task:
1119 self.task.abort()
1120 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001121 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001122 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001123
showardd3dc1992009-04-22 21:01:40 +00001124
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001125class BaseAgentTask(object):
showardd1195652009-12-08 22:21:02 +00001126 class _NullMonitor(object):
1127 pidfile_id = None
1128
1129 def has_process(self):
1130 return True
1131
1132
1133 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001134 """
showardd1195652009-12-08 22:21:02 +00001135 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001136 """
jadmanski0afbb632008-06-06 21:10:57 +00001137 self.done = False
showardd1195652009-12-08 22:21:02 +00001138 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001139 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001140 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001141 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001142 self.queue_entry_ids = []
1143 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001144 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001145
1146
1147 def _set_ids(self, host=None, queue_entries=None):
1148 if queue_entries and queue_entries != [None]:
1149 self.host_ids = [entry.host.id for entry in queue_entries]
1150 self.queue_entry_ids = [entry.id for entry in queue_entries]
1151 else:
1152 assert host
1153 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001154
1155
jadmanski0afbb632008-06-06 21:10:57 +00001156 def poll(self):
showard08a36412009-05-05 01:01:13 +00001157 if not self.started:
1158 self.start()
showardd1195652009-12-08 22:21:02 +00001159 if not self.done:
1160 self.tick()
showard08a36412009-05-05 01:01:13 +00001161
1162
1163 def tick(self):
showardd1195652009-12-08 22:21:02 +00001164 assert self.monitor
1165 exit_code = self.monitor.exit_code()
1166 if exit_code is None:
1167 return
mbligh36768f02008-02-22 18:28:33 +00001168
showardd1195652009-12-08 22:21:02 +00001169 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001170 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001171
1172
jadmanski0afbb632008-06-06 21:10:57 +00001173 def is_done(self):
1174 return self.done
mbligh36768f02008-02-22 18:28:33 +00001175
1176
jadmanski0afbb632008-06-06 21:10:57 +00001177 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001178 if self.done:
showardd1195652009-12-08 22:21:02 +00001179 assert self.started
showard08a36412009-05-05 01:01:13 +00001180 return
showardd1195652009-12-08 22:21:02 +00001181 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001182 self.done = True
1183 self.success = success
1184 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001185
1186
jadmanski0afbb632008-06-06 21:10:57 +00001187 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001188 """
1189 To be overridden.
1190 """
showarded2afea2009-07-07 20:54:07 +00001191 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001192 self.register_necessary_pidfiles()
1193
1194
1195 def _log_file(self):
1196 if not self._log_file_name:
1197 return None
1198 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001199
mbligh36768f02008-02-22 18:28:33 +00001200
jadmanski0afbb632008-06-06 21:10:57 +00001201 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001202 log_file = self._log_file()
1203 if self.monitor and log_file:
1204 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001205
1206
jadmanski0afbb632008-06-06 21:10:57 +00001207 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001208 """
1209 To be overridden.
1210 """
jadmanski0afbb632008-06-06 21:10:57 +00001211 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001212 logging.info("%s finished with success=%s", type(self).__name__,
1213 self.success)
1214
mbligh36768f02008-02-22 18:28:33 +00001215
1216
jadmanski0afbb632008-06-06 21:10:57 +00001217 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001218 if not self.started:
1219 self.prolog()
1220 self.run()
1221
1222 self.started = True
1223
1224
1225 def abort(self):
1226 if self.monitor:
1227 self.monitor.kill()
1228 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001229 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001230 self.cleanup()
1231
1232
showarded2afea2009-07-07 20:54:07 +00001233 def _get_consistent_execution_path(self, execution_entries):
1234 first_execution_path = execution_entries[0].execution_path()
1235 for execution_entry in execution_entries[1:]:
1236 assert execution_entry.execution_path() == first_execution_path, (
1237 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1238 execution_entry,
1239 first_execution_path,
1240 execution_entries[0]))
1241 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001242
1243
showarded2afea2009-07-07 20:54:07 +00001244 def _copy_results(self, execution_entries, use_monitor=None):
1245 """
1246 @param execution_entries: list of objects with execution_path() method
1247 """
showard6d1c1432009-08-20 23:30:39 +00001248 if use_monitor is not None and not use_monitor.has_process():
1249 return
1250
showarded2afea2009-07-07 20:54:07 +00001251 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001252 if use_monitor is None:
1253 assert self.monitor
1254 use_monitor = self.monitor
1255 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001256 execution_path = self._get_consistent_execution_path(execution_entries)
1257 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001258 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001259
showarda1e74b32009-05-12 17:32:04 +00001260
1261 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001262 for queue_entry in queue_entries:
1263 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001264
1265
mbligh4608b002010-01-05 18:22:35 +00001266 def _archive_results(self, queue_entries):
1267 for queue_entry in queue_entries:
1268 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001269
1270
showardd1195652009-12-08 22:21:02 +00001271 def _command_line(self):
1272 """
1273 Return the command line to run. Must be overridden.
1274 """
1275 raise NotImplementedError
1276
1277
1278 @property
1279 def num_processes(self):
1280 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001281 Return the number of processes forked by this BaseAgentTask's process.
1282 It may only be approximate. To be overridden if necessary.
showardd1195652009-12-08 22:21:02 +00001283 """
1284 return 1
1285
1286
1287 def _paired_with_monitor(self):
1288 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001289 If this BaseAgentTask's process must run on the same machine as some
showardd1195652009-12-08 22:21:02 +00001290 previous process, this method should be overridden to return a
1291 PidfileRunMonitor for that process.
1292 """
1293 return self._NullMonitor()
1294
1295
1296 @property
1297 def owner_username(self):
1298 """
1299 Return login of user responsible for this task. May be None. Must be
1300 overridden.
1301 """
1302 raise NotImplementedError
1303
1304
1305 def _working_directory(self):
1306 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001307 Return the directory where this BaseAgentTask's process executes.
1308 Must be overridden.
showardd1195652009-12-08 22:21:02 +00001309 """
1310 raise NotImplementedError
1311
1312
1313 def _pidfile_name(self):
1314 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001315 Return the name of the pidfile this BaseAgentTask's process uses. To be
showardd1195652009-12-08 22:21:02 +00001316 overridden if necessary.
1317 """
jamesrenc44ae992010-02-19 00:12:54 +00001318 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001319
1320
1321 def _check_paired_results_exist(self):
1322 if not self._paired_with_monitor().has_process():
1323 email_manager.manager.enqueue_notify_email(
1324 'No paired results in task',
1325 'No paired results in task %s at %s'
1326 % (self, self._paired_with_monitor().pidfile_id))
1327 self.finished(False)
1328 return False
1329 return True
1330
1331
1332 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001333 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001334 self.monitor = PidfileRunMonitor()
1335
1336
1337 def run(self):
1338 if not self._check_paired_results_exist():
1339 return
1340
1341 self._create_monitor()
1342 self.monitor.run(
1343 self._command_line(), self._working_directory(),
1344 num_processes=self.num_processes,
1345 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1346 pidfile_name=self._pidfile_name(),
1347 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001348 username=self.owner_username,
1349 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1350
1351
1352 def get_drone_hostnames_allowed(self):
1353 if not models.DroneSet.drone_sets_enabled():
1354 return None
1355
1356 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1357 if not hqes:
1358 # Only special tasks could be missing host queue entries
1359 assert isinstance(self, SpecialAgentTask)
1360 return self._user_or_global_default_drone_set(
1361 self.task, self.task.requested_by)
1362
1363 job_ids = hqes.values_list('job', flat=True).distinct()
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001364 assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
jamesren76fcf192010-04-21 20:39:50 +00001365 "span multiple jobs")
1366
1367 job = models.Job.objects.get(id=job_ids[0])
1368 drone_set = job.drone_set
1369 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001370 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001371
1372 return drone_set.get_drone_hostnames()
1373
1374
1375 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1376 """
1377 Returns the user's default drone set, if present.
1378
1379 Otherwise, returns the global default drone set.
1380 """
1381 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1382 if not user:
1383 logging.warn('%s had no owner; using default drone set',
1384 obj_with_owner)
1385 return default_hostnames
1386 if not user.drone_set:
1387 logging.warn('User %s has no default drone set, using global '
1388 'default', user.login)
1389 return default_hostnames
1390 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001391
1392
1393 def register_necessary_pidfiles(self):
1394 pidfile_id = _drone_manager.get_pidfile_id_from(
1395 self._working_directory(), self._pidfile_name())
1396 _drone_manager.register_pidfile(pidfile_id)
1397
1398 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1399 if paired_pidfile_id:
1400 _drone_manager.register_pidfile(paired_pidfile_id)
1401
1402
1403 def recover(self):
1404 if not self._check_paired_results_exist():
1405 return
1406
1407 self._create_monitor()
1408 self.monitor.attach_to_existing_process(
1409 self._working_directory(), pidfile_name=self._pidfile_name(),
1410 num_processes=self.num_processes)
1411 if not self.monitor.has_process():
1412 # no process to recover; wait to be started normally
1413 self.monitor = None
1414 return
1415
1416 self.started = True
Aviv Keshet225bdfe2013-03-05 10:10:08 -08001417 logging.info('Recovering process %s for %s at %s',
1418 self.monitor.get_process(), type(self).__name__,
1419 self._working_directory())
mbligh36768f02008-02-22 18:28:33 +00001420
1421
mbligh4608b002010-01-05 18:22:35 +00001422 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1423 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001424 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001425 for entry in queue_entries:
1426 if entry.status not in allowed_hqe_statuses:
Dale Curtisaa513362011-03-01 17:27:44 -08001427 raise host_scheduler.SchedulerError(
1428 '%s attempting to start entry with invalid status %s: '
1429 '%s' % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001430 invalid_host_status = (
1431 allowed_host_statuses is not None
1432 and entry.host.status not in allowed_host_statuses)
1433 if invalid_host_status:
Dale Curtisaa513362011-03-01 17:27:44 -08001434 raise host_scheduler.SchedulerError(
1435 '%s attempting to start on queue entry with invalid '
1436 'host status %s: %s'
1437 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001438
1439
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001440SiteAgentTask = utils.import_site_class(
1441 __file__, 'autotest_lib.scheduler.site_monitor_db',
1442 'SiteAgentTask', BaseAgentTask)
1443
1444class AgentTask(SiteAgentTask):
1445 pass
1446
1447
showardd9205182009-04-27 20:09:55 +00001448class TaskWithJobKeyvals(object):
1449 """AgentTask mixin providing functionality to help with job keyval files."""
1450 _KEYVAL_FILE = 'keyval'
1451 def _format_keyval(self, key, value):
1452 return '%s=%s' % (key, value)
1453
1454
1455 def _keyval_path(self):
1456 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001457 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001458
1459
1460 def _write_keyval_after_job(self, field, value):
1461 assert self.monitor
1462 if not self.monitor.has_process():
1463 return
1464 _drone_manager.write_lines_to_file(
1465 self._keyval_path(), [self._format_keyval(field, value)],
1466 paired_with_process=self.monitor.get_process())
1467
1468
1469 def _job_queued_keyval(self, job):
1470 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1471
1472
1473 def _write_job_finished(self):
1474 self._write_keyval_after_job("job_finished", int(time.time()))
1475
1476
showarddb502762009-09-09 15:31:20 +00001477 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1478 keyval_contents = '\n'.join(self._format_keyval(key, value)
1479 for key, value in keyval_dict.iteritems())
1480 # always end with a newline to allow additional keyvals to be written
1481 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001482 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001483 keyval_contents,
1484 file_path=keyval_path)
1485
1486
1487 def _write_keyvals_before_job(self, keyval_dict):
1488 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1489
1490
1491 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001492 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001493 host.hostname)
1494 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001495 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001496 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1497 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1498
1499
showard8cc058f2009-09-08 16:26:33 +00001500class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001501 """
1502 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1503 """
1504
1505 TASK_TYPE = None
1506 host = None
1507 queue_entry = None
1508
showardd1195652009-12-08 22:21:02 +00001509 def __init__(self, task, extra_command_args):
1510 super(SpecialAgentTask, self).__init__()
1511
lmrb7c5d272010-04-16 06:34:04 +00001512 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001513
jamesrenc44ae992010-02-19 00:12:54 +00001514 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001515 self.queue_entry = None
1516 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001517 self.queue_entry = scheduler_models.HostQueueEntry(
1518 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001519
showarded2afea2009-07-07 20:54:07 +00001520 self.task = task
1521 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001522
1523
showard8cc058f2009-09-08 16:26:33 +00001524 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001525 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1526
1527
1528 def _command_line(self):
1529 return _autoserv_command_line(self.host.hostname,
1530 self._extra_command_args,
1531 queue_entry=self.queue_entry)
1532
1533
1534 def _working_directory(self):
1535 return self.task.execution_path()
1536
1537
1538 @property
1539 def owner_username(self):
1540 if self.task.requested_by:
1541 return self.task.requested_by.login
1542 return None
showard8cc058f2009-09-08 16:26:33 +00001543
1544
showarded2afea2009-07-07 20:54:07 +00001545 def prolog(self):
1546 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001547 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001548 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001549
1550
showardde634ee2009-01-30 01:44:24 +00001551 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001552 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001553
showard2fe3f1d2009-07-06 20:19:11 +00001554 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001555 return # don't fail metahost entries, they'll be reassigned
1556
showard2fe3f1d2009-07-06 20:19:11 +00001557 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001558 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001559 return # entry has been aborted
1560
Alex Millerdfff2fd2013-05-28 13:05:06 -07001561 self._actually_fail_queue_entry()
1562
1563
1564 # TODO(milleral): http://crbug.com/268607
1565 # All this used to be a part of _fail_queue_entry. The
1566 # exact semantics of when one should and should not be failing a queue
1567 # entry need to be worked out, because provisioning has placed us in a
1568 # case where we want to fail a queue entry that could be requeued,
1569 # which makes us fail the two above if statements, and thus
1570 # _fail_queue_entry() would exit early and have no effect.
1571 # What's left here with _actually_fail_queue_entry is a hack to be able to
1572 # bypass the checks and unconditionally execute the code.
1573 def _actually_fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001574 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001575 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001576 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001577 self._write_keyval_after_job(queued_key, queued_time)
1578 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001579
showard8cc058f2009-09-08 16:26:33 +00001580 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001581 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001582 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001583 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001584
showard8cc058f2009-09-08 16:26:33 +00001585 pidfile_id = _drone_manager.get_pidfile_id_from(
1586 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001587 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001588 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001589
1590 if self.queue_entry.job.parse_failed_repair:
1591 self._parse_results([self.queue_entry])
1592 else:
1593 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001594
Alex Miller23676a22013-07-03 09:03:36 -07001595 # Also fail all other special tasks that have not yet run for this HQE
1596 pending_tasks = models.SpecialTask.objects.filter(
1597 queue_entry__id=self.queue_entry.id,
1598 is_complete=0)
Alex Miller5e36ccc2013-08-03 16:31:58 -07001599 for task in pending_tasks:
1600 task.finish(False)
Alex Miller23676a22013-07-03 09:03:36 -07001601
showard8cc058f2009-09-08 16:26:33 +00001602
1603 def cleanup(self):
1604 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001605
1606 # We will consider an aborted task to be "Failed"
1607 self.task.finish(bool(self.success))
1608
showardf85a0b72009-10-07 20:48:45 +00001609 if self.monitor:
1610 if self.monitor.has_process():
1611 self._copy_results([self.task])
1612 if self.monitor.pidfile_id is not None:
1613 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001614
1615
Dan Shi07e09af2013-04-12 09:31:29 -07001616 def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
1617 """Remove a type of special task in all tasks, keep last one if needed.
1618
1619 @param special_task_to_remove: type of special task to be removed, e.g.,
1620 models.SpecialTask.Task.VERIFY.
1621 @param keep_last_one: True to keep the last special task if its type is
1622 the same as of special_task_to_remove.
1623
1624 """
1625 queued_special_tasks = models.SpecialTask.objects.filter(
1626 host__id=self.host.id,
1627 task=special_task_to_remove,
1628 is_active=False, is_complete=False, queue_entry=None)
1629 if keep_last_one:
1630 queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
1631 queued_special_tasks.delete()
1632
1633
showard8cc058f2009-09-08 16:26:33 +00001634class RepairTask(SpecialAgentTask):
1635 TASK_TYPE = models.SpecialTask.Task.REPAIR
1636
1637
showardd1195652009-12-08 22:21:02 +00001638 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001639 """\
1640 queue_entry: queue entry to mark failed if this repair fails.
1641 """
1642 protection = host_protections.Protection.get_string(
1643 task.host.protection)
1644 # normalize the protection name
1645 protection = host_protections.Protection.get_attr_name(protection)
1646
1647 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001648 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001649
1650 # *don't* include the queue entry in IDs -- if the queue entry is
1651 # aborted, we want to leave the repair task running
1652 self._set_ids(host=self.host)
1653
1654
1655 def prolog(self):
1656 super(RepairTask, self).prolog()
1657 logging.info("repair_task starting")
1658 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001659
1660
jadmanski0afbb632008-06-06 21:10:57 +00001661 def epilog(self):
1662 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001663
jadmanski0afbb632008-06-06 21:10:57 +00001664 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001665 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001666 else:
showard8cc058f2009-09-08 16:26:33 +00001667 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001668 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001669 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001670
1671
showarded2afea2009-07-07 20:54:07 +00001672class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001673 def _copy_to_results_repository(self):
1674 if not self.queue_entry or self.queue_entry.meta_host:
1675 return
1676
1677 self.queue_entry.set_execution_subdir()
1678 log_name = os.path.basename(self.task.execution_path())
1679 source = os.path.join(self.task.execution_path(), 'debug',
1680 'autoserv.DEBUG')
1681 destination = os.path.join(
1682 self.queue_entry.execution_path(), log_name)
1683
1684 self.monitor.try_copy_to_results_repository(
1685 source, destination_path=destination)
1686
1687
showard170873e2009-01-07 00:22:26 +00001688 def epilog(self):
1689 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001690
showard775300b2009-09-09 15:30:50 +00001691 if self.success:
1692 return
showard8fe93b52008-11-18 17:53:22 +00001693
showard775300b2009-09-09 15:30:50 +00001694 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001695
showard775300b2009-09-09 15:30:50 +00001696 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001697 # effectively ignore failure for these hosts
1698 self.success = True
showard775300b2009-09-09 15:30:50 +00001699 return
1700
1701 if self.queue_entry:
1702 self.queue_entry.requeue()
Alex Millerf3f19452013-07-29 15:53:00 -07001703 # If we requeue a HQE, we should cancel any remaining pre-job
1704 # tasks against this host, otherwise we'll be left in a state
1705 # where a queued HQE has special tasks to run against a host.
1706 models.SpecialTask.objects.filter(
1707 queue_entry__id=self.queue_entry.id,
1708 host__id=self.host.id,
1709 is_complete=0).update(is_complete=1, success=0)
showard775300b2009-09-09 15:30:50 +00001710
1711 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001712 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001713 queue_entry__id=self.queue_entry.id):
1714 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1715 self._fail_queue_entry()
1716 return
1717
showard9bb960b2009-11-19 01:02:11 +00001718 queue_entry = models.HostQueueEntry.objects.get(
1719 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001720 else:
1721 queue_entry = None
1722
1723 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001724 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001725 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001726 queue_entry=queue_entry,
1727 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001728
showard8fe93b52008-11-18 17:53:22 +00001729
Alex Miller42437f92013-05-28 12:58:54 -07001730 def _should_pending(self):
1731 """
1732 Decide if we should call the host queue entry's on_pending method.
1733 We should if:
1734 1) There exists an associated host queue entry.
1735 2) The current special task completed successfully.
1736 3) There do not exist any more special tasks to be run before the
1737 host queue entry starts.
1738
1739 @returns: True if we should call pending, false if not.
1740
1741 """
1742 if not self.queue_entry or not self.success:
1743 return False
1744
1745 # We know if this is the last one when we create it, so we could add
1746 # another column to the database to keep track of this information, but
1747 # I expect the overhead of querying here to be minimal.
1748 queue_entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1749 queued = models.SpecialTask.objects.filter(
1750 host__id=self.host.id, is_active=False,
1751 is_complete=False, queue_entry=queue_entry)
1752 queued = queued.exclude(id=self.task.id)
1753 return queued.count() == 0
1754
1755
showard8fe93b52008-11-18 17:53:22 +00001756class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001757 TASK_TYPE = models.SpecialTask.Task.VERIFY
1758
1759
showardd1195652009-12-08 22:21:02 +00001760 def __init__(self, task):
1761 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001762 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001763
1764
jadmanski0afbb632008-06-06 21:10:57 +00001765 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001766 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001767
showardb18134f2009-03-20 20:52:18 +00001768 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001769 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001770 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1771 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001772
jamesren42318f72010-05-10 23:40:59 +00001773 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00001774 # and there's no need to keep records of other requests.
Dan Shi07e09af2013-04-12 09:31:29 -07001775 self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
1776 keep_last_one=True)
showard2fe3f1d2009-07-06 20:19:11 +00001777
mbligh36768f02008-02-22 18:28:33 +00001778
jadmanski0afbb632008-06-06 21:10:57 +00001779 def epilog(self):
1780 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001781 if self.success:
Alex Miller42437f92013-05-28 12:58:54 -07001782 if self._should_pending():
showard8cc058f2009-09-08 16:26:33 +00001783 self.queue_entry.on_pending()
1784 else:
1785 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001786
1787
mbligh4608b002010-01-05 18:22:35 +00001788class CleanupTask(PreJobTask):
1789 # note this can also run post-job, but when it does, it's running standalone
1790 # against the host (not related to the job), so it's not considered a
1791 # PostJobTask
1792
1793 TASK_TYPE = models.SpecialTask.Task.CLEANUP
1794
1795
1796 def __init__(self, task, recover_run_monitor=None):
1797 super(CleanupTask, self).__init__(task, ['--cleanup'])
1798 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1799
1800
1801 def prolog(self):
1802 super(CleanupTask, self).prolog()
1803 logging.info("starting cleanup task for host: %s", self.host.hostname)
1804 self.host.set_status(models.Host.Status.CLEANING)
1805 if self.queue_entry:
Dan Shi07e09af2013-04-12 09:31:29 -07001806 self.queue_entry.set_status(models.HostQueueEntry.Status.CLEANING)
mbligh4608b002010-01-05 18:22:35 +00001807
1808
1809 def _finish_epilog(self):
1810 if not self.queue_entry or not self.success:
1811 return
1812
1813 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
1814 should_run_verify = (
1815 self.queue_entry.job.run_verify
1816 and self.host.protection != do_not_verify_protection)
1817 if should_run_verify:
1818 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1819 models.SpecialTask.objects.create(
1820 host=models.Host.objects.get(id=self.host.id),
1821 queue_entry=entry,
1822 task=models.SpecialTask.Task.VERIFY)
1823 else:
Alex Miller42437f92013-05-28 12:58:54 -07001824 if self._should_pending():
1825 self.queue_entry.on_pending()
mbligh4608b002010-01-05 18:22:35 +00001826
1827
1828 def epilog(self):
1829 super(CleanupTask, self).epilog()
1830
1831 if self.success:
1832 self.host.update_field('dirty', 0)
1833 self.host.set_status(models.Host.Status.READY)
1834
1835 self._finish_epilog()
1836
1837
Dan Shi07e09af2013-04-12 09:31:29 -07001838class ResetTask(PreJobTask):
1839 """Task to reset a DUT, including cleanup and verify."""
1840 # note this can also run post-job, but when it does, it's running standalone
1841 # against the host (not related to the job), so it's not considered a
1842 # PostJobTask
1843
1844 TASK_TYPE = models.SpecialTask.Task.RESET
1845
1846
1847 def __init__(self, task, recover_run_monitor=None):
1848 super(ResetTask, self).__init__(task, ['--reset'])
1849 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1850
1851
1852 def prolog(self):
1853 super(ResetTask, self).prolog()
1854 logging.info('starting reset task for host: %s',
1855 self.host.hostname)
1856 self.host.set_status(models.Host.Status.RESETTING)
1857 if self.queue_entry:
1858 self.queue_entry.set_status(models.HostQueueEntry.Status.RESETTING)
1859
1860 # Delete any queued cleanups for this host.
1861 self.remove_special_tasks(models.SpecialTask.Task.CLEANUP,
1862 keep_last_one=False)
1863
1864 # Delete any queued reverifies for this host.
1865 self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
1866 keep_last_one=False)
1867
1868 # Only one reset is needed.
1869 self.remove_special_tasks(models.SpecialTask.Task.RESET,
1870 keep_last_one=True)
1871
1872
1873 def epilog(self):
1874 super(ResetTask, self).epilog()
1875
1876 if self.success:
1877 self.host.update_field('dirty', 0)
Dan Shi07e09af2013-04-12 09:31:29 -07001878
Alex Millerba076c52013-07-11 10:11:48 -07001879 if self._should_pending():
Dan Shi07e09af2013-04-12 09:31:29 -07001880 self.queue_entry.on_pending()
Alex Millerdc608d52013-07-30 14:26:21 -07001881 else:
1882 self.host.set_status(models.Host.Status.READY)
Dan Shi07e09af2013-04-12 09:31:29 -07001883
1884
Alex Millerdfff2fd2013-05-28 13:05:06 -07001885class ProvisionTask(PreJobTask):
1886 TASK_TYPE = models.SpecialTask.Task.PROVISION
1887
1888 def __init__(self, task):
1889 # Provisioning requires that we be associated with a job/queue entry
1890 assert task.queue_entry, "No HQE associated with provision task!"
1891 # task.queue_entry is an afe model HostQueueEntry object.
1892 # self.queue_entry is a scheduler models HostQueueEntry object, but
1893 # it gets constructed and assigned in __init__, so it's not available
1894 # yet. Therefore, we're stuck pulling labels off of the afe model
1895 # so that we can pass the --provision args into the __init__ call.
1896 labels = {x.name for x in task.queue_entry.job.dependency_labels.all()}
1897 _, provisionable = provision.filter_labels(labels)
1898 extra_command_args = ['--provision', ','.join(provisionable)]
1899 super(ProvisionTask, self).__init__(task, extra_command_args)
1900 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1901
1902
1903 def _command_line(self):
1904 # If we give queue_entry to _autoserv_command_line, then it will append
1905 # -c for this invocation if the queue_entry is a client side test. We
1906 # don't want that, as it messes with provisioning, so we just drop it
1907 # from the arguments here.
1908 # Note that we also don't verify job_repo_url as provisioining tasks are
1909 # required to stage whatever content we need, and the job itself will
1910 # force autotest to be staged if it isn't already.
1911 return _autoserv_command_line(self.host.hostname,
1912 self._extra_command_args)
1913
1914
1915 def prolog(self):
1916 super(ProvisionTask, self).prolog()
1917 # add check for previous provision task and abort if exist.
1918 logging.info("starting provision task for host: %s", self.host.hostname)
1919 self.queue_entry.set_status(
1920 models.HostQueueEntry.Status.PROVISIONING)
1921 self.host.set_status(models.Host.Status.PROVISIONING)
1922
1923
1924 def epilog(self):
1925 # TODO(milleral) Here, we override the PreJobTask's epilog, because
1926 # it's written with the idea that pre-job special task failures are a
1927 # problem with the host and not with something about the HQE.
1928 # In our case, the HQE's DEPENDENCIES specify what the provision task
1929 # does, so if the provision fails, it can be the fault of the HQE, and
1930 # thus we fail the HQE. This difference is handled only here for now,
1931 # but some refactoring of PreJobTask should likely happen sometime in
1932 # the future?
1933
1934 if not self.success:
1935 # TODO(milleral) http://crbug.com/231452
1936 # In our own setup, we don't really use the results
1937 # repository, so I *think* this call can be elided. However, I'd
1938 # like to limit what I can possibly break for now, and it would be
1939 # called if I called PreJobTask's epilog, so I'm keeping the call
1940 # to it for now.
1941 self._copy_to_results_repository()
1942 # _actually_fail_queue_entry() is a hack around the fact that we do
1943 # indeed want to abort the queue entry here, but the rest of the
1944 # scheduler code expects that we will reschedule onto some other
1945 # host.
1946 self._actually_fail_queue_entry()
1947 # This abort will mark the aborted bit on the HQE itself, to
1948 # signify that we're killing it. Technically it also will do
1949 # the recursive aborting of all child jobs, but that shouldn't
1950 # matter here, as only suites have children, and those
1951 # are hostless and thus don't have provisioning.
1952 queue_entry = models.HostQueueEntry.objects.get(
1953 id=self.queue_entry.id)
1954 queue_entry.abort()
1955 # Calling abort will set the aborted bit. However, complete is
1956 # still unset, so we end up kicking off a QueueTask and aborting
1957 # that also, which causes some chaos because we effectivly abort
1958 # something twice. The end effect of which is that it hides the
1959 # reason field we try to inject via the abort reason.
1960 # Thus, we manually poke the complete bit here, so suppress
1961 # the QueueTask abort.
1962 # TODO(milleral) http://crbug.com/268596
1963 # We really should be calling set_status here, but if we set the
1964 # HQE to status='Aborted', then suddenly we stop getting a reason
1965 # field showing up in run_suite. So for the moment, the tests will
1966 # be getting marked as Failed instead of Aborted, which should get
1967 # fixed.
1968 self.queue_entry.update_field('complete', 1)
1969 # The machine is in some totally unknown state, so let's kick off
1970 # a repair task to get it back to some known sane state.
1971 models.SpecialTask.objects.create(
1972 host=models.Host.objects.get(id=self.host.id),
1973 task=models.SpecialTask.Task.REPAIR,
1974 queue_entry=queue_entry,
1975 requested_by=self.task.requested_by)
1976 elif self._should_pending():
1977 self.queue_entry.on_pending()
1978 else:
1979 self.host.set_status(models.Host.Status.READY)
1980
1981
showarda9545c02009-12-18 22:44:26 +00001982class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
1983 """
1984 Common functionality for QueueTask and HostlessQueueTask
1985 """
1986 def __init__(self, queue_entries):
1987 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001988 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001989 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001990
1991
showard73ec0442009-02-07 02:05:20 +00001992 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001993 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001994
1995
jamesrenc44ae992010-02-19 00:12:54 +00001996 def _write_control_file(self, execution_path):
1997 control_path = _drone_manager.attach_file_to_execution(
1998 execution_path, self.job.control_file)
1999 return control_path
2000
2001
Aviv Keshet308e7362013-05-21 14:43:16 -07002002 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00002003 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00002004 execution_path = self.queue_entries[0].execution_path()
2005 control_path = self._write_control_file(execution_path)
2006 hostnames = ','.join(entry.host.hostname
2007 for entry in self.queue_entries
2008 if not entry.is_hostless())
2009
2010 execution_tag = self.queue_entries[0].execution_tag()
2011 params = _autoserv_command_line(
2012 hostnames,
beepscb6f1e22013-06-28 19:14:10 -07002013 ['-P', execution_tag, '-n', '--verify_job_repo_url',
jamesrenc44ae992010-02-19 00:12:54 +00002014 _drone_manager.absolute_path(control_path)],
2015 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07002016 if self.job.is_image_update_job():
2017 params += ['--image', self.job.update_image_path]
2018
jamesrenc44ae992010-02-19 00:12:54 +00002019 return params
showardd1195652009-12-08 22:21:02 +00002020
2021
2022 @property
2023 def num_processes(self):
2024 return len(self.queue_entries)
2025
2026
2027 @property
2028 def owner_username(self):
2029 return self.job.owner
2030
2031
2032 def _working_directory(self):
2033 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002034
2035
jadmanski0afbb632008-06-06 21:10:57 +00002036 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002037 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00002038 keyval_dict = self.job.keyval_dict()
2039 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00002040 group_name = self.queue_entries[0].get_group_name()
2041 if group_name:
2042 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002043 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002044 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002045 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002046 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002047
2048
showard35162b02009-03-03 02:17:30 +00002049 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002050 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002051 _drone_manager.write_lines_to_file(error_file_path,
2052 [_LOST_PROCESS_ERROR])
2053
2054
showardd3dc1992009-04-22 21:01:40 +00002055 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002056 if not self.monitor:
2057 return
2058
showardd9205182009-04-27 20:09:55 +00002059 self._write_job_finished()
2060
showard35162b02009-03-03 02:17:30 +00002061 if self.monitor.lost_process:
2062 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002063
jadmanskif7fa2cc2008-10-01 14:13:23 +00002064
showardcbd74612008-11-19 21:42:02 +00002065 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002066 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002067 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002068 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002069 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002070
2071
jadmanskif7fa2cc2008-10-01 14:13:23 +00002072 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002073 if not self.monitor or not self.monitor.has_process():
2074 return
2075
jadmanskif7fa2cc2008-10-01 14:13:23 +00002076 # build up sets of all the aborted_by and aborted_on values
2077 aborted_by, aborted_on = set(), set()
2078 for queue_entry in self.queue_entries:
2079 if queue_entry.aborted_by:
2080 aborted_by.add(queue_entry.aborted_by)
2081 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2082 aborted_on.add(t)
2083
2084 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00002085 # TODO(showard): this conditional is now obsolete, we just need to leave
2086 # it in temporarily for backwards compatibility over upgrades. delete
2087 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00002088 assert len(aborted_by) <= 1
2089 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002090 aborted_by_value = aborted_by.pop()
2091 aborted_on_value = max(aborted_on)
2092 else:
2093 aborted_by_value = 'autotest_system'
2094 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002095
showarda0382352009-02-11 23:36:43 +00002096 self._write_keyval_after_job("aborted_by", aborted_by_value)
2097 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002098
showardcbd74612008-11-19 21:42:02 +00002099 aborted_on_string = str(datetime.datetime.fromtimestamp(
2100 aborted_on_value))
2101 self._write_status_comment('Job aborted by %s on %s' %
2102 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002103
2104
jadmanski0afbb632008-06-06 21:10:57 +00002105 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002106 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002107 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002108 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002109
2110
jadmanski0afbb632008-06-06 21:10:57 +00002111 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002112 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002113 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002114
2115
2116class QueueTask(AbstractQueueTask):
2117 def __init__(self, queue_entries):
2118 super(QueueTask, self).__init__(queue_entries)
2119 self._set_ids(queue_entries=queue_entries)
2120
2121
2122 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002123 self._check_queue_entry_statuses(
2124 self.queue_entries,
2125 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2126 models.HostQueueEntry.Status.RUNNING),
2127 allowed_host_statuses=(models.Host.Status.PENDING,
2128 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002129
2130 super(QueueTask, self).prolog()
2131
2132 for queue_entry in self.queue_entries:
2133 self._write_host_keyvals(queue_entry.host)
2134 queue_entry.host.set_status(models.Host.Status.RUNNING)
2135 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00002136
2137
2138 def _finish_task(self):
2139 super(QueueTask, self)._finish_task()
2140
2141 for queue_entry in self.queue_entries:
2142 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00002143 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00002144
2145
mbligh4608b002010-01-05 18:22:35 +00002146class HostlessQueueTask(AbstractQueueTask):
2147 def __init__(self, queue_entry):
2148 super(HostlessQueueTask, self).__init__([queue_entry])
2149 self.queue_entry_ids = [queue_entry.id]
2150
2151
2152 def prolog(self):
2153 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2154 super(HostlessQueueTask, self).prolog()
2155
2156
mbligh4608b002010-01-05 18:22:35 +00002157 def _finish_task(self):
2158 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002159 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002160
2161
showardd3dc1992009-04-22 21:01:40 +00002162class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002163 def __init__(self, queue_entries, log_file_name):
2164 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002165
showardd1195652009-12-08 22:21:02 +00002166 self.queue_entries = queue_entries
2167
showardd3dc1992009-04-22 21:01:40 +00002168 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002169 self._autoserv_monitor.attach_to_existing_process(
2170 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002171
showardd1195652009-12-08 22:21:02 +00002172
2173 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002174 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002175 return 'true'
2176 return self._generate_command(
2177 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002178
2179
2180 def _generate_command(self, results_dir):
2181 raise NotImplementedError('Subclasses must override this')
2182
2183
showardd1195652009-12-08 22:21:02 +00002184 @property
2185 def owner_username(self):
2186 return self.queue_entries[0].job.owner
2187
2188
2189 def _working_directory(self):
2190 return self._get_consistent_execution_path(self.queue_entries)
2191
2192
2193 def _paired_with_monitor(self):
2194 return self._autoserv_monitor
2195
2196
showardd3dc1992009-04-22 21:01:40 +00002197 def _job_was_aborted(self):
2198 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002199 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002200 queue_entry.update_from_database()
2201 if was_aborted is None: # first queue entry
2202 was_aborted = bool(queue_entry.aborted)
2203 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00002204 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
2205 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00002206 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00002207 'Inconsistent abort state',
2208 'Queue entries have inconsistent abort state:\n' +
2209 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00002210 # don't crash here, just assume true
2211 return True
2212 return was_aborted
2213
2214
showardd1195652009-12-08 22:21:02 +00002215 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002216 if self._job_was_aborted():
2217 return models.HostQueueEntry.Status.ABORTED
2218
2219 # we'll use a PidfileRunMonitor to read the autoserv exit status
2220 if self._autoserv_monitor.exit_code() == 0:
2221 return models.HostQueueEntry.Status.COMPLETED
2222 return models.HostQueueEntry.Status.FAILED
2223
2224
showardd3dc1992009-04-22 21:01:40 +00002225 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002226 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002227 queue_entry.set_status(status)
2228
2229
2230 def abort(self):
2231 # override AgentTask.abort() to avoid killing the process and ending
2232 # the task. post-job tasks continue when the job is aborted.
2233 pass
2234
2235
mbligh4608b002010-01-05 18:22:35 +00002236 def _pidfile_label(self):
2237 # '.autoserv_execute' -> 'autoserv'
2238 return self._pidfile_name()[1:-len('_execute')]
2239
2240
showard9bb960b2009-11-19 01:02:11 +00002241class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002242 """
2243 Task responsible for
2244 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2245 * copying logs to the results repository
2246 * spawning CleanupTasks for hosts, if necessary
2247 * spawning a FinalReparseTask for the job
2248 """
showardd1195652009-12-08 22:21:02 +00002249 def __init__(self, queue_entries, recover_run_monitor=None):
2250 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002251 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002252 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002253 self._set_ids(queue_entries=queue_entries)
2254
2255
Aviv Keshet308e7362013-05-21 14:43:16 -07002256 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd3dc1992009-04-22 21:01:40 +00002257 def _generate_command(self, results_dir):
2258 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002259 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002260 return [_autoserv_path , '-p',
2261 '--pidfile-label=%s' % self._pidfile_label(),
2262 '--use-existing-results', '--collect-crashinfo',
2263 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002264
2265
showardd1195652009-12-08 22:21:02 +00002266 @property
2267 def num_processes(self):
2268 return len(self.queue_entries)
2269
2270
2271 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002272 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002273
2274
showardd3dc1992009-04-22 21:01:40 +00002275 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002276 self._check_queue_entry_statuses(
2277 self.queue_entries,
2278 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2279 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002280
showardd3dc1992009-04-22 21:01:40 +00002281 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002282
2283
showardd3dc1992009-04-22 21:01:40 +00002284 def epilog(self):
2285 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002286 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002287 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002288
showard9bb960b2009-11-19 01:02:11 +00002289
2290 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002291 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002292 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002293 models.HostQueueEntry.Status.COMPLETED)
2294 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2295 else:
2296 final_success = False
2297 num_tests_failed = 0
showard9bb960b2009-11-19 01:02:11 +00002298 reboot_after = self._job.reboot_after
2299 do_reboot = (
2300 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002301 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002302 or reboot_after == model_attributes.RebootAfter.ALWAYS
2303 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
Dan Shi07e09af2013-04-12 09:31:29 -07002304 and final_success and num_tests_failed == 0)
2305 or num_tests_failed > 0)
showard9bb960b2009-11-19 01:02:11 +00002306
showardd1195652009-12-08 22:21:02 +00002307 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002308 if do_reboot:
2309 # don't pass the queue entry to the CleanupTask. if the cleanup
2310 # fails, the job doesn't care -- it's over.
2311 models.SpecialTask.objects.create(
2312 host=models.Host.objects.get(id=queue_entry.host.id),
2313 task=models.SpecialTask.Task.CLEANUP,
2314 requested_by=self._job.owner_model())
2315 else:
2316 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002317
2318
showard0bbfc212009-04-29 21:06:13 +00002319 def run(self):
showard597bfd32009-05-08 18:22:50 +00002320 autoserv_exit_code = self._autoserv_monitor.exit_code()
2321 # only run if Autoserv exited due to some signal. if we have no exit
2322 # code, assume something bad (and signal-like) happened.
2323 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002324 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002325 else:
2326 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002327
2328
mbligh4608b002010-01-05 18:22:35 +00002329class SelfThrottledPostJobTask(PostJobTask):
2330 """
2331 Special AgentTask subclass that maintains its own global process limit.
2332 """
2333 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002334
2335
mbligh4608b002010-01-05 18:22:35 +00002336 @classmethod
2337 def _increment_running_processes(cls):
2338 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002339
mblighd5c95802008-03-05 00:33:46 +00002340
mbligh4608b002010-01-05 18:22:35 +00002341 @classmethod
2342 def _decrement_running_processes(cls):
2343 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002344
2345
mbligh4608b002010-01-05 18:22:35 +00002346 @classmethod
2347 def _max_processes(cls):
2348 raise NotImplementedError
2349
2350
2351 @classmethod
2352 def _can_run_new_process(cls):
2353 return cls._num_running_processes < cls._max_processes()
2354
2355
2356 def _process_started(self):
2357 return bool(self.monitor)
2358
2359
2360 def tick(self):
2361 # override tick to keep trying to start until the process count goes
2362 # down and we can, at which point we revert to default behavior
2363 if self._process_started():
2364 super(SelfThrottledPostJobTask, self).tick()
2365 else:
2366 self._try_starting_process()
2367
2368
2369 def run(self):
2370 # override run() to not actually run unless we can
2371 self._try_starting_process()
2372
2373
2374 def _try_starting_process(self):
2375 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002376 return
2377
mbligh4608b002010-01-05 18:22:35 +00002378 # actually run the command
2379 super(SelfThrottledPostJobTask, self).run()
jamesren25663562010-04-27 18:00:55 +00002380 if self._process_started():
2381 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002382
mblighd5c95802008-03-05 00:33:46 +00002383
mbligh4608b002010-01-05 18:22:35 +00002384 def finished(self, success):
2385 super(SelfThrottledPostJobTask, self).finished(success)
2386 if self._process_started():
2387 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002388
showard21baa452008-10-21 00:08:39 +00002389
mbligh4608b002010-01-05 18:22:35 +00002390class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002391 def __init__(self, queue_entries):
2392 super(FinalReparseTask, self).__init__(queue_entries,
2393 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002394 # don't use _set_ids, since we don't want to set the host_ids
2395 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002396
2397
2398 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002399 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002400 results_dir]
2401
2402
2403 @property
2404 def num_processes(self):
2405 return 0 # don't include parser processes in accounting
2406
2407
2408 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002409 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002410
2411
showard97aed502008-11-04 02:01:24 +00002412 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002413 def _max_processes(cls):
2414 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002415
2416
2417 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002418 self._check_queue_entry_statuses(
2419 self.queue_entries,
2420 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002421
showard97aed502008-11-04 02:01:24 +00002422 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002423
2424
2425 def epilog(self):
2426 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002427 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002428
2429
mbligh4608b002010-01-05 18:22:35 +00002430class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002431 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2432
mbligh4608b002010-01-05 18:22:35 +00002433 def __init__(self, queue_entries):
2434 super(ArchiveResultsTask, self).__init__(queue_entries,
2435 log_file_name='.archiving.log')
2436 # don't use _set_ids, since we don't want to set the host_ids
2437 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002438
2439
mbligh4608b002010-01-05 18:22:35 +00002440 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002441 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002442
2443
Aviv Keshet308e7362013-05-21 14:43:16 -07002444 # TODO: Refactor into autoserv_utils. crbug.com/243090
mbligh4608b002010-01-05 18:22:35 +00002445 def _generate_command(self, results_dir):
2446 return [_autoserv_path , '-p',
2447 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002448 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002449 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2450 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002451
2452
mbligh4608b002010-01-05 18:22:35 +00002453 @classmethod
2454 def _max_processes(cls):
2455 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002456
2457
2458 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002459 self._check_queue_entry_statuses(
2460 self.queue_entries,
2461 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2462
2463 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002464
2465
mbligh4608b002010-01-05 18:22:35 +00002466 def epilog(self):
2467 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002468 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002469 failed_file = os.path.join(self._working_directory(),
2470 self._ARCHIVING_FAILED_FILE)
2471 paired_process = self._paired_with_monitor().get_process()
2472 _drone_manager.write_lines_to_file(
2473 failed_file, ['Archiving failed with exit code %s'
2474 % self.monitor.exit_code()],
2475 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002476 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002477
2478
mbligh36768f02008-02-22 18:28:33 +00002479if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002480 main()