blob: 22f087b40296a6dd524197df8e09d151994ed98c [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
mbligh36768f02008-02-22 18:28:33 +00008
Aviv Keshet225bdfe2013-03-05 10:10:08 -08009import datetime, optparse, os, signal
10import sys, time, traceback, urllib
11import logging, gc
showard402934a2009-12-21 22:20:47 +000012
Alex Miller05d7b4c2013-03-04 07:49:38 -080013import common
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000015
16import django.db
17
showard136e6dc2009-06-10 19:38:49 +000018from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000019from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000020from autotest_lib.database import database_connection
jamesrendd855242010-03-02 22:23:44 +000021from autotest_lib.frontend.afe import model_attributes
Alex Miller05d7b4c2013-03-04 07:49:38 -080022from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000023from autotest_lib.scheduler import drone_manager, drones, email_manager
Dale Curtisaa513362011-03-01 17:27:44 -080024from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
Alex Miller05d7b4c2013-03-04 07:49:38 -080025from autotest_lib.scheduler import scheduler_logging_config
jamesrenc44ae992010-02-19 00:12:54 +000026from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080027from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070028from autotest_lib.server import autoserv_utils
Alex Millerdfff2fd2013-05-28 13:05:06 -070029from autotest_lib.server.cros import provision
Fang Deng1d6c2a02013-04-17 15:25:45 -070030from autotest_lib.site_utils.graphite import stats
Alex Miller05d7b4c2013-03-04 07:49:38 -080031
showard549afad2009-08-20 23:33:36 +000032BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
33PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000034
mbligh36768f02008-02-22 18:28:33 +000035RESULTS_DIR = '.'
36AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000037DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000038AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
39
40if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000041 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000042AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
43AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
44
45if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000046 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000047
showard35162b02009-03-03 02:17:30 +000048# error message to leave in results dir when an autoserv process disappears
49# mysteriously
50_LOST_PROCESS_ERROR = """\
51Autoserv failed abnormally during execution for this job, probably due to a
52system error on the Autotest server. Full results may not be available. Sorry.
53"""
54
mbligh6f8bab42008-02-29 22:45:14 +000055_db = None
mbligh36768f02008-02-22 18:28:33 +000056_shutdown = False
Aviv Keshet308e7362013-05-21 14:43:16 -070057_autoserv_directory = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server')
58_autoserv_path = os.path.join(_autoserv_directory, 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000059_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000060_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000061
Eric Lie0493a42010-11-15 13:05:43 -080062def _parser_path_default(install_dir):
63 return os.path.join(install_dir, 'tko', 'parse')
64_parser_path_func = utils.import_site_function(
65 __file__, 'autotest_lib.scheduler.site_monitor_db',
66 'parser_path', _parser_path_default)
67_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
68
mbligh36768f02008-02-22 18:28:33 +000069
showardec6a3b92009-09-25 20:29:13 +000070def _get_pidfile_timeout_secs():
71 """@returns How long to wait for autoserv to write pidfile."""
72 pidfile_timeout_mins = global_config.global_config.get_config_value(
73 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
74 return pidfile_timeout_mins * 60
75
76
mbligh83c1e9e2009-05-01 23:10:41 +000077def _site_init_monitor_db_dummy():
78 return {}
79
80
jamesren76fcf192010-04-21 20:39:50 +000081def _verify_default_drone_set_exists():
82 if (models.DroneSet.drone_sets_enabled() and
83 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080084 raise host_scheduler.SchedulerError(
85 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000086
87
88def _sanity_check():
89 """Make sure the configs are consistent before starting the scheduler"""
90 _verify_default_drone_set_exists()
91
92
mbligh36768f02008-02-22 18:28:33 +000093def main():
showard27f33872009-04-07 18:20:53 +000094 try:
showard549afad2009-08-20 23:33:36 +000095 try:
96 main_without_exception_handling()
97 except SystemExit:
98 raise
99 except:
100 logging.exception('Exception escaping in monitor_db')
101 raise
102 finally:
103 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000104
105
106def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000107 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000108
showard136e6dc2009-06-10 19:38:49 +0000109 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000110 parser = optparse.OptionParser(usage)
111 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
112 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000113 parser.add_option('--test', help='Indicate that scheduler is under ' +
114 'test and should use dummy autoserv and no parsing',
115 action='store_true')
116 (options, args) = parser.parse_args()
117 if len(args) != 1:
118 parser.print_usage()
119 return
mbligh36768f02008-02-22 18:28:33 +0000120
showard5613c662009-06-08 23:30:33 +0000121 scheduler_enabled = global_config.global_config.get_config_value(
122 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
123
124 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800125 logging.error("Scheduler not enabled, set enable_scheduler to true in "
126 "the global_config's SCHEDULER section to enable it. "
127 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000128 sys.exit(1)
129
jadmanski0afbb632008-06-06 21:10:57 +0000130 global RESULTS_DIR
131 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000132
mbligh83c1e9e2009-05-01 23:10:41 +0000133 site_init = utils.import_site_function(__file__,
134 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
135 _site_init_monitor_db_dummy)
136 site_init()
137
showardcca334f2009-03-12 20:38:34 +0000138 # Change the cwd while running to avoid issues incase we were launched from
139 # somewhere odd (such as a random NFS home directory of the person running
140 # sudo to launch us as the appropriate user).
141 os.chdir(RESULTS_DIR)
142
jamesrenc7d387e2010-08-10 21:48:30 +0000143 # This is helpful for debugging why stuff a scheduler launches is
144 # misbehaving.
145 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000146
jadmanski0afbb632008-06-06 21:10:57 +0000147 if options.test:
148 global _autoserv_path
149 _autoserv_path = 'autoserv_dummy'
150 global _testing_mode
151 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000152
jamesrenc44ae992010-02-19 00:12:54 +0000153 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000154 server.start()
155
jadmanski0afbb632008-06-06 21:10:57 +0000156 try:
jamesrenc44ae992010-02-19 00:12:54 +0000157 initialize()
showardc5afc462009-01-13 00:09:39 +0000158 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000159 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000160
Eric Lia82dc352011-02-23 13:15:52 -0800161 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000162 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000163 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000164 except:
showard170873e2009-01-07 00:22:26 +0000165 email_manager.manager.log_stacktrace(
166 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000167
showard170873e2009-01-07 00:22:26 +0000168 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000169 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000170 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000171 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000172
173
showard136e6dc2009-06-10 19:38:49 +0000174def setup_logging():
175 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
176 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
177 logging_manager.configure_logging(
178 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
179 logfile_name=log_name)
180
181
mbligh36768f02008-02-22 18:28:33 +0000182def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000183 global _shutdown
184 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000185 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000186
187
jamesrenc44ae992010-02-19 00:12:54 +0000188def initialize():
showardb18134f2009-03-20 20:52:18 +0000189 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
190 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000191
showard8de37132009-08-31 18:33:08 +0000192 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000193 logging.critical("monitor_db already running, aborting!")
194 sys.exit(1)
195 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000196
showardb1e51872008-10-07 11:08:18 +0000197 if _testing_mode:
198 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000199 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000200
jadmanski0afbb632008-06-06 21:10:57 +0000201 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
202 global _db
showard170873e2009-01-07 00:22:26 +0000203 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000204 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000205
showardfa8629c2008-11-04 16:51:23 +0000206 # ensure Django connection is in autocommit
207 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000208 # bypass the readonly connection
209 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000210
showardb18134f2009-03-20 20:52:18 +0000211 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000212 signal.signal(signal.SIGINT, handle_sigint)
213
jamesrenc44ae992010-02-19 00:12:54 +0000214 initialize_globals()
215 scheduler_models.initialize()
216
showardd1ee1dd2009-01-07 21:33:08 +0000217 drones = global_config.global_config.get_config_value(
218 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
219 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000220 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000221 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000222 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
223
showardb18134f2009-03-20 20:52:18 +0000224 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000225
226
jamesrenc44ae992010-02-19 00:12:54 +0000227def initialize_globals():
228 global _drone_manager
229 _drone_manager = drone_manager.instance()
230
231
showarded2afea2009-07-07 20:54:07 +0000232def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
233 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000234 """
235 @returns The autoserv command line as a list of executable + parameters.
236
237 @param machines - string - A machine or comma separated list of machines
238 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000239 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700240 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
241 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000242 @param queue_entry - A HostQueueEntry object - If supplied and no Job
243 object was supplied, this will be used to lookup the Job object.
244 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700245 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
246 machines, results_directory=drone_manager.WORKING_DIRECTORY,
247 extra_args=extra_args, job=job, queue_entry=queue_entry,
248 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000249
250
Simran Basia858a232012-08-21 11:04:37 -0700251class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800252
253
jadmanski0afbb632008-06-06 21:10:57 +0000254 def __init__(self):
255 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000256 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800257 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000258 user_cleanup_time = scheduler_config.config.clean_interval
259 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
260 _db, user_cleanup_time)
261 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000262 self._host_agents = {}
263 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000264 self._tick_count = 0
265 self._last_garbage_stats_time = time.time()
266 self._seconds_between_garbage_stats = 60 * (
267 global_config.global_config.get_config_value(
268 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700269 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700270 self._tick_debug = global_config.global_config.get_config_value(
271 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
272 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700273 self._extra_debugging = global_config.global_config.get_config_value(
274 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
275 default=False)
mbligh36768f02008-02-22 18:28:33 +0000276
mbligh36768f02008-02-22 18:28:33 +0000277
showard915958d2009-04-22 21:00:58 +0000278 def initialize(self, recover_hosts=True):
279 self._periodic_cleanup.initialize()
280 self._24hr_upkeep.initialize()
281
jadmanski0afbb632008-06-06 21:10:57 +0000282 # always recover processes
283 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000284
jadmanski0afbb632008-06-06 21:10:57 +0000285 if recover_hosts:
286 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000287
jamesrenc44ae992010-02-19 00:12:54 +0000288 self._host_scheduler.recovery_on_startup()
289
mbligh36768f02008-02-22 18:28:33 +0000290
Simran Basi0ec94dd2012-08-28 09:50:10 -0700291 def _log_tick_msg(self, msg):
292 if self._tick_debug:
293 logging.debug(msg)
294
295
Simran Basidef92872012-09-20 13:34:34 -0700296 def _log_extra_msg(self, msg):
297 if self._extra_debugging:
298 logging.debug(msg)
299
300
jadmanski0afbb632008-06-06 21:10:57 +0000301 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700302 """
303 This is an altered version of tick() where we keep track of when each
304 major step begins so we can try to figure out where we are using most
305 of the tick time.
306 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700307 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700308 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000309 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700310 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000311 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700312 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000313 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700314 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000315 self._find_aborting()
beeps8bb1f7d2013-08-05 01:30:09 -0700316 self._log_tick_msg('Calling _find_aborted_special_tasks().')
317 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700318 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000319 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700320 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000321 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700322 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000323 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700324 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000325 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700326 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000327 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700328 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000329 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700330 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000331 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700332 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000333 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700334 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700335 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700336 with timer.get_client('email_manager_send_queued_emails'):
337 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700338 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700339 with timer.get_client('django_db_reset_queries'):
340 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000341 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000342
showard97aed502008-11-04 02:01:24 +0000343
mblighf3294cc2009-04-08 21:17:38 +0000344 def _run_cleanup(self):
345 self._periodic_cleanup.run_cleanup_maybe()
346 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000347
mbligh36768f02008-02-22 18:28:33 +0000348
showardf13a9e22009-12-18 22:54:09 +0000349 def _garbage_collection(self):
350 threshold_time = time.time() - self._seconds_between_garbage_stats
351 if threshold_time < self._last_garbage_stats_time:
352 # Don't generate these reports very often.
353 return
354
355 self._last_garbage_stats_time = time.time()
356 # Force a full level 0 collection (because we can, it doesn't hurt
357 # at this interval).
358 gc.collect()
359 logging.info('Logging garbage collector stats on tick %d.',
360 self._tick_count)
361 gc_stats._log_garbage_collector_stats()
362
363
showard170873e2009-01-07 00:22:26 +0000364 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
365 for object_id in object_ids:
366 agent_dict.setdefault(object_id, set()).add(agent)
367
368
369 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
370 for object_id in object_ids:
371 assert object_id in agent_dict
372 agent_dict[object_id].remove(agent)
373
374
showardd1195652009-12-08 22:21:02 +0000375 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700376 """
377 Creates and adds an agent to the dispatchers list.
378
379 In creating the agent we also pass on all the queue_entry_ids and
380 host_ids from the special agent task. For every agent we create, we
381 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
382 against the host_ids given to it. So theoritically, a host can have any
383 number of agents associated with it, and each of them can have any
384 special agent task, though in practice we never see > 1 agent/task per
385 host at any time.
386
387 @param agent_task: A SpecialTask for the agent to manage.
388 """
showardd1195652009-12-08 22:21:02 +0000389 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000390 self._agents.append(agent)
391 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000392 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
393 self._register_agent_for_ids(self._queue_entry_agents,
394 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000395
showard170873e2009-01-07 00:22:26 +0000396
397 def get_agents_for_entry(self, queue_entry):
398 """
399 Find agents corresponding to the specified queue_entry.
400 """
showardd3dc1992009-04-22 21:01:40 +0000401 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000402
403
404 def host_has_agent(self, host):
405 """
406 Determine if there is currently an Agent present using this host.
407 """
408 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000409
410
jadmanski0afbb632008-06-06 21:10:57 +0000411 def remove_agent(self, agent):
412 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000413 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
414 agent)
415 self._unregister_agent_for_ids(self._queue_entry_agents,
416 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000417
418
showard8cc058f2009-09-08 16:26:33 +0000419 def _host_has_scheduled_special_task(self, host):
420 return bool(models.SpecialTask.objects.filter(host__id=host.id,
421 is_active=False,
422 is_complete=False))
423
424
jadmanski0afbb632008-06-06 21:10:57 +0000425 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000426 agent_tasks = self._create_recovery_agent_tasks()
427 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000428 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000429 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000430 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000431 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000432 self._reverify_remaining_hosts()
433 # reinitialize drones after killing orphaned processes, since they can
434 # leave around files when they die
435 _drone_manager.execute_actions()
436 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000437
showard170873e2009-01-07 00:22:26 +0000438
showardd1195652009-12-08 22:21:02 +0000439 def _create_recovery_agent_tasks(self):
440 return (self._get_queue_entry_agent_tasks()
441 + self._get_special_task_agent_tasks(is_active=True))
442
443
444 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700445 """
446 Get agent tasks for all hqe in the specified states.
447
448 Loosely this translates to taking a hqe in one of the specified states,
449 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
450 through _get_agent_task_for_queue_entry. Each queue entry can only have
451 one agent task at a time, but there might be multiple queue entries in
452 the group.
453
454 @return: A list of AgentTasks.
455 """
showardd1195652009-12-08 22:21:02 +0000456 # host queue entry statuses handled directly by AgentTasks (Verifying is
457 # handled through SpecialTasks, so is not listed here)
458 statuses = (models.HostQueueEntry.Status.STARTING,
459 models.HostQueueEntry.Status.RUNNING,
460 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000461 models.HostQueueEntry.Status.PARSING,
462 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000463 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000464 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000465 where='status IN (%s)' % status_list)
466
467 agent_tasks = []
468 used_queue_entries = set()
469 for entry in queue_entries:
470 if self.get_agents_for_entry(entry):
471 # already being handled
472 continue
473 if entry in used_queue_entries:
474 # already picked up by a synchronous job
475 continue
476 agent_task = self._get_agent_task_for_queue_entry(entry)
477 agent_tasks.append(agent_task)
478 used_queue_entries.update(agent_task.queue_entries)
479 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000480
481
showardd1195652009-12-08 22:21:02 +0000482 def _get_special_task_agent_tasks(self, is_active=False):
483 special_tasks = models.SpecialTask.objects.filter(
484 is_active=is_active, is_complete=False)
485 return [self._get_agent_task_for_special_task(task)
486 for task in special_tasks]
487
488
489 def _get_agent_task_for_queue_entry(self, queue_entry):
490 """
beeps8bb1f7d2013-08-05 01:30:09 -0700491 Construct an AgentTask instance for the given active HostQueueEntry.
492
showardd1195652009-12-08 22:21:02 +0000493 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700494 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000495 """
496 task_entries = queue_entry.job.get_group_entries(queue_entry)
497 self._check_for_duplicate_host_entries(task_entries)
498
499 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
500 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000501 if queue_entry.is_hostless():
502 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000503 return QueueTask(queue_entries=task_entries)
504 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
505 return GatherLogsTask(queue_entries=task_entries)
506 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
507 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000508 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
509 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000510
Dale Curtisaa513362011-03-01 17:27:44 -0800511 raise host_scheduler.SchedulerError(
512 '_get_agent_task_for_queue_entry got entry with '
513 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000514
515
516 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000517 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
518 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000519 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000520 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000521 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000522 if using_host:
showardd1195652009-12-08 22:21:02 +0000523 self._assert_host_has_no_agent(task_entry)
524
525
526 def _assert_host_has_no_agent(self, entry):
527 """
528 @param entry: a HostQueueEntry or a SpecialTask
529 """
530 if self.host_has_agent(entry.host):
531 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800532 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000533 'While scheduling %s, host %s already has a host agent %s'
534 % (entry, entry.host, agent.task))
535
536
537 def _get_agent_task_for_special_task(self, special_task):
538 """
539 Construct an AgentTask class to run the given SpecialTask and add it
540 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700541
542 A special task is create through schedule_special_tasks, but only if
543 the host doesn't already have an agent. This happens through
544 add_agent_task. All special agent tasks are given a host on creation,
545 and a Null hqe. To create a SpecialAgentTask object, you need a
546 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
547 object contains a hqe it's passed on to the special agent task, which
548 creates a HostQueueEntry and saves it as it's queue_entry.
549
showardd1195652009-12-08 22:21:02 +0000550 @param special_task: a models.SpecialTask instance
551 @returns an AgentTask to run this SpecialTask
552 """
553 self._assert_host_has_no_agent(special_task)
554
Dan Shi07e09af2013-04-12 09:31:29 -0700555 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask,
Alex Millerdfff2fd2013-05-28 13:05:06 -0700556 ResetTask, ProvisionTask)
showardd1195652009-12-08 22:21:02 +0000557 for agent_task_class in special_agent_task_classes:
558 if agent_task_class.TASK_TYPE == special_task.task:
559 return agent_task_class(task=special_task)
560
Dale Curtisaa513362011-03-01 17:27:44 -0800561 raise host_scheduler.SchedulerError(
562 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000563
564
565 def _register_pidfiles(self, agent_tasks):
566 for agent_task in agent_tasks:
567 agent_task.register_necessary_pidfiles()
568
569
570 def _recover_tasks(self, agent_tasks):
571 orphans = _drone_manager.get_orphaned_autoserv_processes()
572
573 for agent_task in agent_tasks:
574 agent_task.recover()
575 if agent_task.monitor and agent_task.monitor.has_process():
576 orphans.discard(agent_task.monitor.get_process())
577 self.add_agent_task(agent_task)
578
579 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000580
581
showard8cc058f2009-09-08 16:26:33 +0000582 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000583 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
584 % status):
showard0db3d432009-10-12 20:29:15 +0000585 if entry.status == status and not self.get_agents_for_entry(entry):
586 # The status can change during iteration, e.g., if job.run()
587 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000588 yield entry
589
590
showard6878e8b2009-07-20 22:37:45 +0000591 def _check_for_remaining_orphan_processes(self, orphans):
592 if not orphans:
593 return
594 subject = 'Unrecovered orphan autoserv processes remain'
595 message = '\n'.join(str(process) for process in orphans)
596 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000597
598 die_on_orphans = global_config.global_config.get_config_value(
599 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
600
601 if die_on_orphans:
602 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000603
showard170873e2009-01-07 00:22:26 +0000604
showard8cc058f2009-09-08 16:26:33 +0000605 def _recover_pending_entries(self):
606 for entry in self._get_unassigned_entries(
607 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000608 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000609 entry.on_pending()
610
611
showardb8900452009-10-12 20:31:01 +0000612 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000613 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000614 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
615 unrecovered_hqes = []
616 for queue_entry in queue_entries:
617 special_tasks = models.SpecialTask.objects.filter(
618 task__in=(models.SpecialTask.Task.CLEANUP,
619 models.SpecialTask.Task.VERIFY),
620 queue_entry__id=queue_entry.id,
621 is_complete=False)
622 if special_tasks.count() == 0:
623 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000624
showardb8900452009-10-12 20:31:01 +0000625 if unrecovered_hqes:
626 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800627 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000628 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000629 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000630
631
showard65db3932009-10-28 19:54:35 +0000632 def _get_prioritized_special_tasks(self):
633 """
634 Returns all queued SpecialTasks prioritized for repair first, then
635 cleanup, then verify.
beeps8bb1f7d2013-08-05 01:30:09 -0700636
637 @return: list of afe.models.SpecialTasks sorted according to priority.
showard65db3932009-10-28 19:54:35 +0000638 """
639 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
640 is_complete=False,
641 host__locked=False)
642 # exclude hosts with active queue entries unless the SpecialTask is for
643 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000644 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000645 queued_tasks, 'afe_host_queue_entries', 'host_id',
646 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000647 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000648 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000649 where=['(afe_host_queue_entries.id IS NULL OR '
650 'afe_host_queue_entries.id = '
651 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000652
showard65db3932009-10-28 19:54:35 +0000653 # reorder tasks by priority
654 task_priority_order = [models.SpecialTask.Task.REPAIR,
655 models.SpecialTask.Task.CLEANUP,
Dan Shi07e09af2013-04-12 09:31:29 -0700656 models.SpecialTask.Task.VERIFY,
Alex Millerdfff2fd2013-05-28 13:05:06 -0700657 models.SpecialTask.Task.RESET,
658 models.SpecialTask.Task.PROVISION]
showard65db3932009-10-28 19:54:35 +0000659 def task_priority_key(task):
660 return task_priority_order.index(task.task)
661 return sorted(queued_tasks, key=task_priority_key)
662
663
showard65db3932009-10-28 19:54:35 +0000664 def _schedule_special_tasks(self):
665 """
666 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700667
668 Special tasks include PreJobTasks like verify, reset and cleanup.
669 They are created through _schedule_new_jobs and associated with a hqe
670 This method translates SpecialTasks to the appropriate AgentTask and
671 adds them to the dispatchers agents list, so _handle_agents can execute
672 them.
showard65db3932009-10-28 19:54:35 +0000673 """
674 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000675 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000676 continue
showardd1195652009-12-08 22:21:02 +0000677 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000678
679
showard170873e2009-01-07 00:22:26 +0000680 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000681 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000682 # should never happen
showarded2afea2009-07-07 20:54:07 +0000683 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000684 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000685 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700686 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000687 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000688
689
jadmanski0afbb632008-06-06 21:10:57 +0000690 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000691 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700692 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000693 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000694 if self.host_has_agent(host):
695 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000696 continue
showard8cc058f2009-09-08 16:26:33 +0000697 if self._host_has_scheduled_special_task(host):
698 # host will have a special task scheduled on the next cycle
699 continue
showard170873e2009-01-07 00:22:26 +0000700 if print_message:
showardb18134f2009-03-20 20:52:18 +0000701 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000702 models.SpecialTask.objects.create(
703 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000704 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000705
706
jadmanski0afbb632008-06-06 21:10:57 +0000707 def _recover_hosts(self):
708 # recover "Repair Failed" hosts
709 message = 'Reverifying dead host %s'
710 self._reverify_hosts_where("status = 'Repair Failed'",
711 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000712
713
showard04c82c52008-05-29 19:38:12 +0000714
showardb95b1bd2008-08-15 18:11:04 +0000715 def _get_pending_queue_entries(self):
beeps7d8a1b12013-10-29 17:58:34 -0700716 """
717 Fetch a list of new host queue entries.
718
719 The ordering of this list is important, as every new agent
720 we schedule can potentially contribute to the process count
721 on the drone, which has a static limit. The sort order
722 prioritizes jobs as follows:
723 1. High priority jobs: Based on the afe_job's priority
724 2. With hosts and metahosts: This will only happen if we don't
725 activate the hqe after assigning a host to it in
726 schedule_new_jobs.
727 3. With hosts but without metahosts: When tests are scheduled
728 through the frontend the owner of the job would have chosen
729 a host for it.
730 4. Without hosts but with metahosts: This is the common case of
731 a new test that needs a DUT. We assign a host and set it to
732 active so it shouldn't show up in case 2 on the next tick.
733 5. Without hosts and without metahosts: Hostless suite jobs, that
734 will result in new jobs that fall under category 4.
735
736 A note about the ordering of cases 3 and 4:
737 Prioritizing one case above the other leads to earlier acquisition
738 of the following resources: 1. process slots on the drone 2. machines.
739 - When a user schedules a job through the afe they choose a specific
740 host for it. Jobs with metahost can utilize any host that satisfies
741 the metahost criterion. This means that if we had scheduled 4 before
742 3 there is a good chance that a job which could've used another host,
743 will now use the host assigned to a metahost-less job. Given the
744 availability of machines in pool:suites, this almost guarantees
745 starvation for jobs scheduled through the frontend.
746 - Scheduling 4 before 3 also has its pros however, since a suite
747 has the concept of a time out, whereas users can wait. If we hit the
748 process count on the drone a suite can timeout waiting on the test,
749 but a user job generally has a much longer timeout, and relatively
750 harmless consequences.
751 The current ordering was chosed because it is more likely that we will
752 run out of machines in pool:suites than processes on the drone.
753
754 @returns A list of HQEs ordered according to sort_order.
755 """
756 sort_order = ('afe_jobs.priority DESC, '
757 'ISNULL(host_id), '
758 'ISNULL(meta_host), '
759 'job_id')
jamesrenc44ae992010-02-19 00:12:54 +0000760 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000761 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000762 where='NOT complete AND NOT active AND status="Queued"',
beeps7d8a1b12013-10-29 17:58:34 -0700763 order_by=sort_order))
mbligh36768f02008-02-22 18:28:33 +0000764
765
showard89f84db2009-03-12 20:39:13 +0000766 def _refresh_pending_queue_entries(self):
767 """
768 Lookup the pending HostQueueEntries and call our HostScheduler
769 refresh() method given that list. Return the list.
770
771 @returns A list of pending HostQueueEntries sorted in priority order.
772 """
showard63a34772008-08-18 19:32:50 +0000773 queue_entries = self._get_pending_queue_entries()
774 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000775 return []
showardb95b1bd2008-08-15 18:11:04 +0000776
showard63a34772008-08-18 19:32:50 +0000777 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000778
showard89f84db2009-03-12 20:39:13 +0000779 return queue_entries
780
781
782 def _schedule_atomic_group(self, queue_entry):
783 """
784 Schedule the given queue_entry on an atomic group of hosts.
785
786 Returns immediately if there are insufficient available hosts.
787
788 Creates new HostQueueEntries based off of queue_entry for the
789 scheduled hosts and starts them all running.
790 """
791 # This is a virtual host queue entry representing an entire
792 # atomic group, find a group and schedule their hosts.
793 group_hosts = self._host_scheduler.find_eligible_atomic_group(
794 queue_entry)
795 if not group_hosts:
796 return
showardcbe6f942009-06-17 19:33:49 +0000797
798 logging.info('Expanding atomic group entry %s with hosts %s',
799 queue_entry,
800 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000801
showard89f84db2009-03-12 20:39:13 +0000802 for assigned_host in group_hosts[1:]:
803 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000804 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000805 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000806 new_hqe.set_host(assigned_host)
807 self._run_queue_entry(new_hqe)
808
809 # The first assigned host uses the original HostQueueEntry
810 queue_entry.set_host(group_hosts[0])
811 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000812
813
showarda9545c02009-12-18 22:44:26 +0000814 def _schedule_hostless_job(self, queue_entry):
815 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000816 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000817
818
showard89f84db2009-03-12 20:39:13 +0000819 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700820 """
821 Find any new HQEs and call schedule_pre_job_tasks for it.
822
823 This involves setting the status of the HQE and creating a row in the
824 db corresponding the the special task, through
825 scheduler_models._queue_special_task. The new db row is then added as
826 an agent to the dispatcher through _schedule_special_tasks and
827 scheduled for execution on the drone through _handle_agents.
828 """
showard89f84db2009-03-12 20:39:13 +0000829 queue_entries = self._refresh_pending_queue_entries()
830 if not queue_entries:
831 return
832
beepsb255fc52013-10-13 23:28:54 -0700833 new_hostless_jobs = 0
834 new_atomic_groups = 0
835 new_jobs_with_hosts = 0
836 new_jobs_need_hosts = 0
837
Simran Basi3f6717d2012-09-13 15:21:22 -0700838 logging.debug('Processing %d queue_entries', len(queue_entries))
showard63a34772008-08-18 19:32:50 +0000839 for queue_entry in queue_entries:
Simran Basidef92872012-09-20 13:34:34 -0700840 self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
showarde55955f2009-10-07 20:48:58 +0000841 is_unassigned_atomic_group = (
842 queue_entry.atomic_group_id is not None
843 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +0000844
845 if queue_entry.is_hostless():
Simran Basidef92872012-09-20 13:34:34 -0700846 self._log_extra_msg('Scheduling hostless job.')
showarda9545c02009-12-18 22:44:26 +0000847 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700848 new_hostless_jobs = new_hostless_jobs + 1
jamesren883492a2010-02-12 00:45:18 +0000849 elif is_unassigned_atomic_group:
850 self._schedule_atomic_group(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700851 new_atmoic_groups = new_atomic_groups + 1
showarde55955f2009-10-07 20:48:58 +0000852 else:
beepsb255fc52013-10-13 23:28:54 -0700853 new_jobs_need_hosts = new_jobs_need_hosts + 1
jamesren883492a2010-02-12 00:45:18 +0000854 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +0000855 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +0000856 assert assigned_host.id == queue_entry.host_id
857 self._run_queue_entry(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700858 new_jobs_with_hosts = new_jobs_with_hosts + 1
859
860 key = 'scheduler.jobs_per_tick'
861 stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
862 stats.Gauge(key).send('new_atomic_groups', new_atomic_groups)
863 stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
864 stats.Gauge(key).send('new_jobs_without_hosts',
865 new_jobs_need_hosts - new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000866
867
showard8cc058f2009-09-08 16:26:33 +0000868 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700869 """
870 Adds agents to the dispatcher.
871
872 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
873 QueueTask for example, will have a job with a control file, and
874 the agent will have methods that poll, abort and check if the queue
875 task is finished. The dispatcher runs the agent_task, as well as
876 other agents in it's _agents member, through _handle_agents, by
877 calling the Agents tick().
878
879 This method creates an agent for each HQE in one of (starting, running,
880 gathering, parsing, archiving) states, and adds it to the dispatcher so
881 it is handled by _handle_agents.
882 """
showardd1195652009-12-08 22:21:02 +0000883 for agent_task in self._get_queue_entry_agent_tasks():
884 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000885
886
887 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000888 for entry in scheduler_models.HostQueueEntry.fetch(
889 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000890 task = entry.job.schedule_delayed_callback_task(entry)
891 if task:
showardd1195652009-12-08 22:21:02 +0000892 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000893
894
jamesren883492a2010-02-12 00:45:18 +0000895 def _run_queue_entry(self, queue_entry):
Simran Basidef92872012-09-20 13:34:34 -0700896 self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
897 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000898 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000899
900
jadmanski0afbb632008-06-06 21:10:57 +0000901 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700902 """
903 Looks through the afe_host_queue_entries for an aborted entry.
904
905 The aborted bit is set on an HQE in many ways, the most common
906 being when a user requests an abort through the frontend, which
907 results in an rpc from the afe to abort_host_queue_entries.
908 """
jamesrene7c65cb2010-06-08 20:38:10 +0000909 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000910 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700911 where='aborted=1 and complete=0'):
showardf4a2e502009-07-28 20:06:39 +0000912 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +0000913 for agent in self.get_agents_for_entry(entry):
914 agent.abort()
915 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000916 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700917 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000918 for job in jobs_to_stop:
919 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000920
921
beeps8bb1f7d2013-08-05 01:30:09 -0700922 def _find_aborted_special_tasks(self):
923 """
924 Find SpecialTasks that have been marked for abortion.
925
926 Poll the database looking for SpecialTasks that are active
927 and have been marked for abortion, then abort them.
928 """
929
930 # The completed and active bits are very important when it comes
931 # to scheduler correctness. The active bit is set through the prolog
932 # of a special task, and reset through the cleanup method of the
933 # SpecialAgentTask. The cleanup is called both through the abort and
934 # epilog. The complete bit is set in several places, and in general
935 # a hanging job will have is_active=1 is_complete=0, while a special
936 # task which completed will have is_active=0 is_complete=1. To check
937 # aborts we directly check active because the complete bit is set in
938 # several places, including the epilog of agent tasks.
939 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
940 is_aborted=True)
941 for task in aborted_tasks:
942 # There are 2 ways to get the agent associated with a task,
943 # through the host and through the hqe. A special task
944 # always needs a host, but doesn't always need a hqe.
945 for agent in self._host_agents.get(task.host.id, []):
946 if isinstance(agent.task, SpecialAgentTask):
947
948 # The epilog preforms critical actions such as
949 # queueing the next SpecialTask, requeuing the
950 # hqe etc, however it doesn't actually kill the
951 # monitor process and set the 'done' bit. Epilogs
952 # assume that the job failed, and that the monitor
953 # process has already written an exit code. The
954 # done bit is a necessary condition for
955 # _handle_agents to schedule any more special
956 # tasks against the host, and it must be set
957 # in addition to is_active, is_complete and success.
958 agent.task.epilog()
959 agent.task.abort()
960
961
showard324bf812009-01-20 23:23:38 +0000962 def _can_start_agent(self, agent, num_started_this_cycle,
963 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000964 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000965 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000966 return True
967 # don't allow any nonzero-process agents to run after we've reached a
968 # limit (this avoids starvation of many-process agents)
969 if have_reached_limit:
970 return False
971 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000972 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000973 agent.task.owner_username,
974 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000975 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000976 return False
977 # if a single agent exceeds the per-cycle throttling, still allow it to
978 # run when it's the first agent in the cycle
979 if num_started_this_cycle == 0:
980 return True
981 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000982 if (num_started_this_cycle + agent.task.num_processes >
983 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000984 return False
985 return True
986
987
jadmanski0afbb632008-06-06 21:10:57 +0000988 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700989 """
990 Handles agents of the dispatcher.
991
992 Appropriate Agents are added to the dispatcher through
993 _schedule_running_host_queue_entries. These agents each
994 have a task. This method runs the agents task through
995 agent.tick() leading to:
996 agent.start
997 prolog -> AgentTasks prolog
998 For each queue entry:
999 sets host status/status to Running
1000 set started_on in afe_host_queue_entries
1001 run -> AgentTasks run
1002 Creates PidfileRunMonitor
1003 Queues the autoserv command line for this AgentTask
1004 via the drone manager. These commands are executed
1005 through the drone managers execute actions.
1006 poll -> AgentTasks/BaseAgentTask poll
1007 checks the monitors exit_code.
1008 Executes epilog if task is finished.
1009 Executes AgentTasks _finish_task
1010 finish_task is usually responsible for setting the status
1011 of the HQE/host, and updating it's active and complete fileds.
1012
1013 agent.is_done
1014 Removed the agent from the dispatchers _agents queue.
1015 Is_done checks the finished bit on the agent, that is
1016 set based on the Agents task. During the agents poll
1017 we check to see if the monitor process has exited in
1018 it's finish method, and set the success member of the
1019 task based on this exit code.
1020 """
jadmanski0afbb632008-06-06 21:10:57 +00001021 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001022 have_reached_limit = False
1023 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -07001024 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +00001025 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001026 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1027 'queue_entry ids:%s' % (agent.host_ids,
1028 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001029 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001030 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001031 have_reached_limit):
1032 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001033 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001034 continue
showardd1195652009-12-08 22:21:02 +00001035 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001036 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001037 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001038 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001039 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -07001040 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001041 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -07001042 logging.info('%d running processes. %d added this cycle.',
1043 _drone_manager.total_running_processes(),
1044 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +00001045
1046
showard29f7cd22009-04-29 21:16:24 +00001047 def _process_recurring_runs(self):
1048 recurring_runs = models.RecurringRun.objects.filter(
1049 start_date__lte=datetime.datetime.now())
1050 for rrun in recurring_runs:
1051 # Create job from template
1052 job = rrun.job
1053 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001054 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001055
1056 host_objects = info['hosts']
1057 one_time_hosts = info['one_time_hosts']
1058 metahost_objects = info['meta_hosts']
1059 dependencies = info['dependencies']
1060 atomic_group = info['atomic_group']
1061
1062 for host in one_time_hosts or []:
1063 this_host = models.Host.create_one_time_host(host.hostname)
1064 host_objects.append(this_host)
1065
1066 try:
1067 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001068 options=options,
showard29f7cd22009-04-29 21:16:24 +00001069 host_objects=host_objects,
1070 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001071 atomic_group=atomic_group)
1072
1073 except Exception, ex:
1074 logging.exception(ex)
1075 #TODO send email
1076
1077 if rrun.loop_count == 1:
1078 rrun.delete()
1079 else:
1080 if rrun.loop_count != 0: # if not infinite loop
1081 # calculate new start_date
1082 difference = datetime.timedelta(seconds=rrun.loop_period)
1083 rrun.start_date = rrun.start_date + difference
1084 rrun.loop_count -= 1
1085 rrun.save()
1086
1087
Simran Basia858a232012-08-21 11:04:37 -07001088SiteDispatcher = utils.import_site_class(
1089 __file__, 'autotest_lib.scheduler.site_monitor_db',
1090 'SiteDispatcher', BaseDispatcher)
1091
1092class Dispatcher(SiteDispatcher):
1093 pass
1094
1095
showard170873e2009-01-07 00:22:26 +00001096class PidfileRunMonitor(object):
1097 """
1098 Client must call either run() to start a new process or
1099 attach_to_existing_process().
1100 """
mbligh36768f02008-02-22 18:28:33 +00001101
showard170873e2009-01-07 00:22:26 +00001102 class _PidfileException(Exception):
1103 """
1104 Raised when there's some unexpected behavior with the pid file, but only
1105 used internally (never allowed to escape this class).
1106 """
mbligh36768f02008-02-22 18:28:33 +00001107
1108
showard170873e2009-01-07 00:22:26 +00001109 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001110 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001111 self._start_time = None
1112 self.pidfile_id = None
1113 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001114
1115
showard170873e2009-01-07 00:22:26 +00001116 def _add_nice_command(self, command, nice_level):
1117 if not nice_level:
1118 return command
1119 return ['nice', '-n', str(nice_level)] + command
1120
1121
1122 def _set_start_time(self):
1123 self._start_time = time.time()
1124
1125
showard418785b2009-11-23 20:19:59 +00001126 def run(self, command, working_directory, num_processes, nice_level=None,
1127 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +00001128 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +00001129 assert command is not None
1130 if nice_level is not None:
1131 command = ['nice', '-n', str(nice_level)] + command
1132 self._set_start_time()
1133 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001134 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001135 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +00001136 paired_with_pidfile=paired_with_pidfile, username=username,
1137 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +00001138
1139
showarded2afea2009-07-07 20:54:07 +00001140 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +00001141 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +00001142 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001143 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001144 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001145 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001146 if num_processes is not None:
1147 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001148
1149
jadmanski0afbb632008-06-06 21:10:57 +00001150 def kill(self):
showard170873e2009-01-07 00:22:26 +00001151 if self.has_process():
1152 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001153
mbligh36768f02008-02-22 18:28:33 +00001154
showard170873e2009-01-07 00:22:26 +00001155 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001156 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001157 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001158
1159
showard170873e2009-01-07 00:22:26 +00001160 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001161 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001162 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001163 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001164
1165
showard170873e2009-01-07 00:22:26 +00001166 def _read_pidfile(self, use_second_read=False):
1167 assert self.pidfile_id is not None, (
1168 'You must call run() or attach_to_existing_process()')
1169 contents = _drone_manager.get_pidfile_contents(
1170 self.pidfile_id, use_second_read=use_second_read)
1171 if contents.is_invalid():
1172 self._state = drone_manager.PidfileContents()
1173 raise self._PidfileException(contents)
1174 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001175
1176
showard21baa452008-10-21 00:08:39 +00001177 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001178 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1179 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001180 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001181 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001182
1183
1184 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001185 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001186 return
mblighbb421852008-03-11 22:36:16 +00001187
showard21baa452008-10-21 00:08:39 +00001188 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001189
showard170873e2009-01-07 00:22:26 +00001190 if self._state.process is None:
1191 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001192 return
mbligh90a549d2008-03-25 23:52:34 +00001193
showard21baa452008-10-21 00:08:39 +00001194 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001195 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001196 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001197 return
mbligh90a549d2008-03-25 23:52:34 +00001198
showard170873e2009-01-07 00:22:26 +00001199 # pid but no running process - maybe process *just* exited
1200 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001201 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001202 # autoserv exited without writing an exit code
1203 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001204 self._handle_pidfile_error(
1205 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001206
showard21baa452008-10-21 00:08:39 +00001207
1208 def _get_pidfile_info(self):
1209 """\
1210 After completion, self._state will contain:
1211 pid=None, exit_status=None if autoserv has not yet run
1212 pid!=None, exit_status=None if autoserv is running
1213 pid!=None, exit_status!=None if autoserv has completed
1214 """
1215 try:
1216 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001217 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001218 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001219
1220
showard170873e2009-01-07 00:22:26 +00001221 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001222 """\
1223 Called when no pidfile is found or no pid is in the pidfile.
1224 """
showard170873e2009-01-07 00:22:26 +00001225 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001226 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001227 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001228 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001229 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001230
1231
showard35162b02009-03-03 02:17:30 +00001232 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001233 """\
1234 Called when autoserv has exited without writing an exit status,
1235 or we've timed out waiting for autoserv to write a pid to the
1236 pidfile. In either case, we just return failure and the caller
1237 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001238
showard170873e2009-01-07 00:22:26 +00001239 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001240 """
1241 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001242 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001243 self._state.exit_status = 1
1244 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001245
1246
jadmanski0afbb632008-06-06 21:10:57 +00001247 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001248 self._get_pidfile_info()
1249 return self._state.exit_status
1250
1251
1252 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001253 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001254 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001255 if self._state.num_tests_failed is None:
1256 return -1
showard21baa452008-10-21 00:08:39 +00001257 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001258
1259
showardcdaeae82009-08-31 18:32:48 +00001260 def try_copy_results_on_drone(self, **kwargs):
1261 if self.has_process():
1262 # copy results logs into the normal place for job results
1263 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1264
1265
1266 def try_copy_to_results_repository(self, source, **kwargs):
1267 if self.has_process():
1268 _drone_manager.copy_to_results_repository(self.get_process(),
1269 source, **kwargs)
1270
1271
mbligh36768f02008-02-22 18:28:33 +00001272class Agent(object):
showard77182562009-06-10 00:16:05 +00001273 """
Alex Miller47715eb2013-07-24 03:34:01 -07001274 An agent for use by the Dispatcher class to perform a task. An agent wraps
1275 around an AgentTask mainly to associate the AgentTask with the queue_entry
1276 and host ids.
showard77182562009-06-10 00:16:05 +00001277
1278 The following methods are required on all task objects:
1279 poll() - Called periodically to let the task check its status and
1280 update its internal state. If the task succeeded.
1281 is_done() - Returns True if the task is finished.
1282 abort() - Called when an abort has been requested. The task must
1283 set its aborted attribute to True if it actually aborted.
1284
1285 The following attributes are required on all task objects:
1286 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001287 success - bool, True if this task succeeded.
1288 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1289 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001290 """
1291
1292
showard418785b2009-11-23 20:19:59 +00001293 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001294 """
Alex Miller47715eb2013-07-24 03:34:01 -07001295 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001296 """
showard8cc058f2009-09-08 16:26:33 +00001297 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001298
showard77182562009-06-10 00:16:05 +00001299 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001300 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001301
showard8cc058f2009-09-08 16:26:33 +00001302 self.queue_entry_ids = task.queue_entry_ids
1303 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001304
showard8cc058f2009-09-08 16:26:33 +00001305 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001306 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001307
1308
jadmanski0afbb632008-06-06 21:10:57 +00001309 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001310 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001311 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001312 self.task.poll()
1313 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001314 self.finished = True
showardec113162008-05-08 00:52:49 +00001315
1316
jadmanski0afbb632008-06-06 21:10:57 +00001317 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001318 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001319
1320
showardd3dc1992009-04-22 21:01:40 +00001321 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001322 if self.task:
1323 self.task.abort()
1324 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001325 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001326 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001327
showardd3dc1992009-04-22 21:01:40 +00001328
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001329class BaseAgentTask(object):
showardd1195652009-12-08 22:21:02 +00001330 class _NullMonitor(object):
1331 pidfile_id = None
1332
1333 def has_process(self):
1334 return True
1335
1336
1337 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001338 """
showardd1195652009-12-08 22:21:02 +00001339 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001340 """
jadmanski0afbb632008-06-06 21:10:57 +00001341 self.done = False
showardd1195652009-12-08 22:21:02 +00001342 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001343 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001344 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001345 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001346 self.queue_entry_ids = []
1347 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001348 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001349
1350
1351 def _set_ids(self, host=None, queue_entries=None):
1352 if queue_entries and queue_entries != [None]:
1353 self.host_ids = [entry.host.id for entry in queue_entries]
1354 self.queue_entry_ids = [entry.id for entry in queue_entries]
1355 else:
1356 assert host
1357 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001358
1359
jadmanski0afbb632008-06-06 21:10:57 +00001360 def poll(self):
showard08a36412009-05-05 01:01:13 +00001361 if not self.started:
1362 self.start()
showardd1195652009-12-08 22:21:02 +00001363 if not self.done:
1364 self.tick()
showard08a36412009-05-05 01:01:13 +00001365
1366
1367 def tick(self):
showardd1195652009-12-08 22:21:02 +00001368 assert self.monitor
1369 exit_code = self.monitor.exit_code()
1370 if exit_code is None:
1371 return
mbligh36768f02008-02-22 18:28:33 +00001372
showardd1195652009-12-08 22:21:02 +00001373 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001374 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001375
1376
jadmanski0afbb632008-06-06 21:10:57 +00001377 def is_done(self):
1378 return self.done
mbligh36768f02008-02-22 18:28:33 +00001379
1380
jadmanski0afbb632008-06-06 21:10:57 +00001381 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001382 if self.done:
showardd1195652009-12-08 22:21:02 +00001383 assert self.started
showard08a36412009-05-05 01:01:13 +00001384 return
showardd1195652009-12-08 22:21:02 +00001385 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001386 self.done = True
1387 self.success = success
1388 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001389
1390
jadmanski0afbb632008-06-06 21:10:57 +00001391 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001392 """
1393 To be overridden.
1394 """
showarded2afea2009-07-07 20:54:07 +00001395 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001396 self.register_necessary_pidfiles()
1397
1398
1399 def _log_file(self):
1400 if not self._log_file_name:
1401 return None
1402 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001403
mbligh36768f02008-02-22 18:28:33 +00001404
jadmanski0afbb632008-06-06 21:10:57 +00001405 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001406 log_file = self._log_file()
1407 if self.monitor and log_file:
1408 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001409
1410
jadmanski0afbb632008-06-06 21:10:57 +00001411 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001412 """
1413 To be overridden.
1414 """
jadmanski0afbb632008-06-06 21:10:57 +00001415 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001416 logging.info("%s finished with success=%s", type(self).__name__,
1417 self.success)
1418
mbligh36768f02008-02-22 18:28:33 +00001419
jadmanski0afbb632008-06-06 21:10:57 +00001420 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001421 if not self.started:
1422 self.prolog()
1423 self.run()
1424
1425 self.started = True
1426
1427
1428 def abort(self):
1429 if self.monitor:
1430 self.monitor.kill()
1431 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001432 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001433 self.cleanup()
1434
1435
showarded2afea2009-07-07 20:54:07 +00001436 def _get_consistent_execution_path(self, execution_entries):
1437 first_execution_path = execution_entries[0].execution_path()
1438 for execution_entry in execution_entries[1:]:
1439 assert execution_entry.execution_path() == first_execution_path, (
1440 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1441 execution_entry,
1442 first_execution_path,
1443 execution_entries[0]))
1444 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001445
1446
showarded2afea2009-07-07 20:54:07 +00001447 def _copy_results(self, execution_entries, use_monitor=None):
1448 """
1449 @param execution_entries: list of objects with execution_path() method
1450 """
showard6d1c1432009-08-20 23:30:39 +00001451 if use_monitor is not None and not use_monitor.has_process():
1452 return
1453
showarded2afea2009-07-07 20:54:07 +00001454 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001455 if use_monitor is None:
1456 assert self.monitor
1457 use_monitor = self.monitor
1458 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001459 execution_path = self._get_consistent_execution_path(execution_entries)
1460 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001461 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001462
showarda1e74b32009-05-12 17:32:04 +00001463
1464 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001465 for queue_entry in queue_entries:
1466 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001467
1468
mbligh4608b002010-01-05 18:22:35 +00001469 def _archive_results(self, queue_entries):
1470 for queue_entry in queue_entries:
1471 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001472
1473
showardd1195652009-12-08 22:21:02 +00001474 def _command_line(self):
1475 """
1476 Return the command line to run. Must be overridden.
1477 """
1478 raise NotImplementedError
1479
1480
1481 @property
1482 def num_processes(self):
1483 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001484 Return the number of processes forked by this BaseAgentTask's process.
1485 It may only be approximate. To be overridden if necessary.
showardd1195652009-12-08 22:21:02 +00001486 """
1487 return 1
1488
1489
1490 def _paired_with_monitor(self):
1491 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001492 If this BaseAgentTask's process must run on the same machine as some
showardd1195652009-12-08 22:21:02 +00001493 previous process, this method should be overridden to return a
1494 PidfileRunMonitor for that process.
1495 """
1496 return self._NullMonitor()
1497
1498
1499 @property
1500 def owner_username(self):
1501 """
1502 Return login of user responsible for this task. May be None. Must be
1503 overridden.
1504 """
1505 raise NotImplementedError
1506
1507
1508 def _working_directory(self):
1509 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001510 Return the directory where this BaseAgentTask's process executes.
1511 Must be overridden.
showardd1195652009-12-08 22:21:02 +00001512 """
1513 raise NotImplementedError
1514
1515
1516 def _pidfile_name(self):
1517 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001518 Return the name of the pidfile this BaseAgentTask's process uses. To be
showardd1195652009-12-08 22:21:02 +00001519 overridden if necessary.
1520 """
jamesrenc44ae992010-02-19 00:12:54 +00001521 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001522
1523
1524 def _check_paired_results_exist(self):
1525 if not self._paired_with_monitor().has_process():
1526 email_manager.manager.enqueue_notify_email(
1527 'No paired results in task',
1528 'No paired results in task %s at %s'
1529 % (self, self._paired_with_monitor().pidfile_id))
1530 self.finished(False)
1531 return False
1532 return True
1533
1534
1535 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001536 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001537 self.monitor = PidfileRunMonitor()
1538
1539
1540 def run(self):
1541 if not self._check_paired_results_exist():
1542 return
1543
1544 self._create_monitor()
1545 self.monitor.run(
1546 self._command_line(), self._working_directory(),
1547 num_processes=self.num_processes,
1548 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1549 pidfile_name=self._pidfile_name(),
1550 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001551 username=self.owner_username,
1552 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1553
1554
1555 def get_drone_hostnames_allowed(self):
1556 if not models.DroneSet.drone_sets_enabled():
1557 return None
1558
1559 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1560 if not hqes:
1561 # Only special tasks could be missing host queue entries
1562 assert isinstance(self, SpecialAgentTask)
1563 return self._user_or_global_default_drone_set(
1564 self.task, self.task.requested_by)
1565
1566 job_ids = hqes.values_list('job', flat=True).distinct()
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001567 assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
jamesren76fcf192010-04-21 20:39:50 +00001568 "span multiple jobs")
1569
1570 job = models.Job.objects.get(id=job_ids[0])
1571 drone_set = job.drone_set
1572 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001573 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001574
1575 return drone_set.get_drone_hostnames()
1576
1577
1578 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1579 """
1580 Returns the user's default drone set, if present.
1581
1582 Otherwise, returns the global default drone set.
1583 """
1584 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1585 if not user:
1586 logging.warn('%s had no owner; using default drone set',
1587 obj_with_owner)
1588 return default_hostnames
1589 if not user.drone_set:
1590 logging.warn('User %s has no default drone set, using global '
1591 'default', user.login)
1592 return default_hostnames
1593 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001594
1595
1596 def register_necessary_pidfiles(self):
1597 pidfile_id = _drone_manager.get_pidfile_id_from(
1598 self._working_directory(), self._pidfile_name())
1599 _drone_manager.register_pidfile(pidfile_id)
1600
1601 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1602 if paired_pidfile_id:
1603 _drone_manager.register_pidfile(paired_pidfile_id)
1604
1605
1606 def recover(self):
1607 if not self._check_paired_results_exist():
1608 return
1609
1610 self._create_monitor()
1611 self.monitor.attach_to_existing_process(
1612 self._working_directory(), pidfile_name=self._pidfile_name(),
1613 num_processes=self.num_processes)
1614 if not self.monitor.has_process():
1615 # no process to recover; wait to be started normally
1616 self.monitor = None
1617 return
1618
1619 self.started = True
Aviv Keshet225bdfe2013-03-05 10:10:08 -08001620 logging.info('Recovering process %s for %s at %s',
1621 self.monitor.get_process(), type(self).__name__,
1622 self._working_directory())
mbligh36768f02008-02-22 18:28:33 +00001623
1624
mbligh4608b002010-01-05 18:22:35 +00001625 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1626 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001627 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001628 for entry in queue_entries:
1629 if entry.status not in allowed_hqe_statuses:
Dale Curtisaa513362011-03-01 17:27:44 -08001630 raise host_scheduler.SchedulerError(
1631 '%s attempting to start entry with invalid status %s: '
1632 '%s' % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001633 invalid_host_status = (
1634 allowed_host_statuses is not None
1635 and entry.host.status not in allowed_host_statuses)
1636 if invalid_host_status:
Dale Curtisaa513362011-03-01 17:27:44 -08001637 raise host_scheduler.SchedulerError(
1638 '%s attempting to start on queue entry with invalid '
1639 'host status %s: %s'
1640 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001641
1642
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001643SiteAgentTask = utils.import_site_class(
1644 __file__, 'autotest_lib.scheduler.site_monitor_db',
1645 'SiteAgentTask', BaseAgentTask)
1646
1647class AgentTask(SiteAgentTask):
1648 pass
1649
1650
showardd9205182009-04-27 20:09:55 +00001651class TaskWithJobKeyvals(object):
1652 """AgentTask mixin providing functionality to help with job keyval files."""
1653 _KEYVAL_FILE = 'keyval'
1654 def _format_keyval(self, key, value):
1655 return '%s=%s' % (key, value)
1656
1657
1658 def _keyval_path(self):
1659 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001660 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001661
1662
1663 def _write_keyval_after_job(self, field, value):
1664 assert self.monitor
1665 if not self.monitor.has_process():
1666 return
1667 _drone_manager.write_lines_to_file(
1668 self._keyval_path(), [self._format_keyval(field, value)],
1669 paired_with_process=self.monitor.get_process())
1670
1671
1672 def _job_queued_keyval(self, job):
1673 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1674
1675
1676 def _write_job_finished(self):
1677 self._write_keyval_after_job("job_finished", int(time.time()))
1678
1679
showarddb502762009-09-09 15:31:20 +00001680 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1681 keyval_contents = '\n'.join(self._format_keyval(key, value)
1682 for key, value in keyval_dict.iteritems())
1683 # always end with a newline to allow additional keyvals to be written
1684 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001685 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001686 keyval_contents,
1687 file_path=keyval_path)
1688
1689
1690 def _write_keyvals_before_job(self, keyval_dict):
1691 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1692
1693
1694 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001695 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001696 host.hostname)
1697 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001698 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001699 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1700 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1701
1702
showard8cc058f2009-09-08 16:26:33 +00001703class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001704 """
1705 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1706 """
1707
1708 TASK_TYPE = None
1709 host = None
1710 queue_entry = None
1711
showardd1195652009-12-08 22:21:02 +00001712 def __init__(self, task, extra_command_args):
1713 super(SpecialAgentTask, self).__init__()
1714
lmrb7c5d272010-04-16 06:34:04 +00001715 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001716
jamesrenc44ae992010-02-19 00:12:54 +00001717 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001718 self.queue_entry = None
1719 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001720 self.queue_entry = scheduler_models.HostQueueEntry(
1721 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001722
showarded2afea2009-07-07 20:54:07 +00001723 self.task = task
1724 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001725
1726
showard8cc058f2009-09-08 16:26:33 +00001727 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001728 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1729
1730
1731 def _command_line(self):
1732 return _autoserv_command_line(self.host.hostname,
1733 self._extra_command_args,
1734 queue_entry=self.queue_entry)
1735
1736
1737 def _working_directory(self):
1738 return self.task.execution_path()
1739
1740
1741 @property
1742 def owner_username(self):
1743 if self.task.requested_by:
1744 return self.task.requested_by.login
1745 return None
showard8cc058f2009-09-08 16:26:33 +00001746
1747
showarded2afea2009-07-07 20:54:07 +00001748 def prolog(self):
1749 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001750 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001751 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001752
1753
showardde634ee2009-01-30 01:44:24 +00001754 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001755 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001756
showard2fe3f1d2009-07-06 20:19:11 +00001757 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001758 return # don't fail metahost entries, they'll be reassigned
1759
showard2fe3f1d2009-07-06 20:19:11 +00001760 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001761 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001762 return # entry has been aborted
1763
Alex Millerdfff2fd2013-05-28 13:05:06 -07001764 self._actually_fail_queue_entry()
1765
1766
1767 # TODO(milleral): http://crbug.com/268607
1768 # All this used to be a part of _fail_queue_entry. The
1769 # exact semantics of when one should and should not be failing a queue
1770 # entry need to be worked out, because provisioning has placed us in a
1771 # case where we want to fail a queue entry that could be requeued,
1772 # which makes us fail the two above if statements, and thus
1773 # _fail_queue_entry() would exit early and have no effect.
1774 # What's left here with _actually_fail_queue_entry is a hack to be able to
1775 # bypass the checks and unconditionally execute the code.
1776 def _actually_fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001777 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001778 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001779 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001780 self._write_keyval_after_job(queued_key, queued_time)
1781 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001782
showard8cc058f2009-09-08 16:26:33 +00001783 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001784 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001785 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001786 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001787
showard8cc058f2009-09-08 16:26:33 +00001788 pidfile_id = _drone_manager.get_pidfile_id_from(
1789 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001790 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001791 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001792
1793 if self.queue_entry.job.parse_failed_repair:
1794 self._parse_results([self.queue_entry])
1795 else:
1796 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001797
Alex Miller23676a22013-07-03 09:03:36 -07001798 # Also fail all other special tasks that have not yet run for this HQE
1799 pending_tasks = models.SpecialTask.objects.filter(
1800 queue_entry__id=self.queue_entry.id,
1801 is_complete=0)
Alex Miller5e36ccc2013-08-03 16:31:58 -07001802 for task in pending_tasks:
1803 task.finish(False)
Alex Miller23676a22013-07-03 09:03:36 -07001804
showard8cc058f2009-09-08 16:26:33 +00001805
1806 def cleanup(self):
1807 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001808
1809 # We will consider an aborted task to be "Failed"
1810 self.task.finish(bool(self.success))
1811
showardf85a0b72009-10-07 20:48:45 +00001812 if self.monitor:
1813 if self.monitor.has_process():
1814 self._copy_results([self.task])
1815 if self.monitor.pidfile_id is not None:
1816 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001817
1818
Dan Shi07e09af2013-04-12 09:31:29 -07001819 def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
1820 """Remove a type of special task in all tasks, keep last one if needed.
1821
1822 @param special_task_to_remove: type of special task to be removed, e.g.,
1823 models.SpecialTask.Task.VERIFY.
1824 @param keep_last_one: True to keep the last special task if its type is
1825 the same as of special_task_to_remove.
1826
1827 """
1828 queued_special_tasks = models.SpecialTask.objects.filter(
1829 host__id=self.host.id,
1830 task=special_task_to_remove,
1831 is_active=False, is_complete=False, queue_entry=None)
1832 if keep_last_one:
1833 queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
1834 queued_special_tasks.delete()
1835
1836
showard8cc058f2009-09-08 16:26:33 +00001837class RepairTask(SpecialAgentTask):
1838 TASK_TYPE = models.SpecialTask.Task.REPAIR
1839
1840
showardd1195652009-12-08 22:21:02 +00001841 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001842 """\
1843 queue_entry: queue entry to mark failed if this repair fails.
1844 """
1845 protection = host_protections.Protection.get_string(
1846 task.host.protection)
1847 # normalize the protection name
1848 protection = host_protections.Protection.get_attr_name(protection)
1849
1850 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001851 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001852
1853 # *don't* include the queue entry in IDs -- if the queue entry is
1854 # aborted, we want to leave the repair task running
1855 self._set_ids(host=self.host)
1856
1857
1858 def prolog(self):
1859 super(RepairTask, self).prolog()
1860 logging.info("repair_task starting")
1861 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001862
1863
jadmanski0afbb632008-06-06 21:10:57 +00001864 def epilog(self):
1865 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001866
jadmanski0afbb632008-06-06 21:10:57 +00001867 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001868 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001869 else:
showard8cc058f2009-09-08 16:26:33 +00001870 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001871 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001872 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001873
1874
showarded2afea2009-07-07 20:54:07 +00001875class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001876 def _copy_to_results_repository(self):
1877 if not self.queue_entry or self.queue_entry.meta_host:
1878 return
1879
1880 self.queue_entry.set_execution_subdir()
1881 log_name = os.path.basename(self.task.execution_path())
1882 source = os.path.join(self.task.execution_path(), 'debug',
1883 'autoserv.DEBUG')
1884 destination = os.path.join(
1885 self.queue_entry.execution_path(), log_name)
1886
1887 self.monitor.try_copy_to_results_repository(
1888 source, destination_path=destination)
1889
1890
showard170873e2009-01-07 00:22:26 +00001891 def epilog(self):
1892 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001893
showard775300b2009-09-09 15:30:50 +00001894 if self.success:
1895 return
showard8fe93b52008-11-18 17:53:22 +00001896
showard775300b2009-09-09 15:30:50 +00001897 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001898 # effectively ignore failure for these hosts
1899 self.success = True
showard775300b2009-09-09 15:30:50 +00001900 return
1901
1902 if self.queue_entry:
Alex Millerf3f19452013-07-29 15:53:00 -07001903 # If we requeue a HQE, we should cancel any remaining pre-job
1904 # tasks against this host, otherwise we'll be left in a state
1905 # where a queued HQE has special tasks to run against a host.
1906 models.SpecialTask.objects.filter(
1907 queue_entry__id=self.queue_entry.id,
1908 host__id=self.host.id,
1909 is_complete=0).update(is_complete=1, success=0)
showard775300b2009-09-09 15:30:50 +00001910
Alex Millera4a78ef2013-09-03 21:23:05 -07001911 previous_provisions = models.SpecialTask.objects.filter(
1912 task=models.SpecialTask.Task.PROVISION,
1913 queue_entry_id=self.queue_entry.id).count()
Alex Miller7bcec082013-09-19 10:00:53 -07001914 if (previous_provisions >
Alex Millera4a78ef2013-09-03 21:23:05 -07001915 scheduler_config.config.max_provision_retries):
1916 self._actually_fail_queue_entry()
1917 # This abort will mark the aborted bit on the HQE itself, to
1918 # signify that we're killing it. Technically it also will do
1919 # the recursive aborting of all child jobs, but that shouldn't
1920 # matter here, as only suites have children, and those are
1921 # hostless and thus don't have provisioning.
1922 # TODO(milleral) http://crbug.com/188217
1923 # However, we can't actually do this yet, as if we set the
1924 # abort bit the FinalReparseTask will set the status of the HQE
1925 # to ABORTED, which then means that we don't show the status in
1926 # run_suite. So in the meantime, don't mark the HQE as
1927 # aborted.
1928 # queue_entry.abort()
1929 else:
1930 # requeue() must come after handling provision retries, since
1931 # _actually_fail_queue_entry needs an execution subdir.
1932 # We also don't want to requeue if we hit the provision retry
1933 # limit, since then we overwrite the PARSING state of the HQE.
1934 self.queue_entry.requeue()
1935
1936 previous_repairs = models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001937 task=models.SpecialTask.Task.REPAIR,
Alex Millera4a78ef2013-09-03 21:23:05 -07001938 queue_entry_id=self.queue_entry.id).count()
1939 if previous_repairs >= scheduler_config.config.max_repair_limit:
showard775300b2009-09-09 15:30:50 +00001940 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1941 self._fail_queue_entry()
1942 return
1943
showard9bb960b2009-11-19 01:02:11 +00001944 queue_entry = models.HostQueueEntry.objects.get(
1945 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001946 else:
1947 queue_entry = None
1948
1949 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001950 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001951 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001952 queue_entry=queue_entry,
1953 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001954
showard8fe93b52008-11-18 17:53:22 +00001955
Alex Miller42437f92013-05-28 12:58:54 -07001956 def _should_pending(self):
1957 """
1958 Decide if we should call the host queue entry's on_pending method.
1959 We should if:
1960 1) There exists an associated host queue entry.
1961 2) The current special task completed successfully.
1962 3) There do not exist any more special tasks to be run before the
1963 host queue entry starts.
1964
1965 @returns: True if we should call pending, false if not.
1966
1967 """
1968 if not self.queue_entry or not self.success:
1969 return False
1970
1971 # We know if this is the last one when we create it, so we could add
1972 # another column to the database to keep track of this information, but
1973 # I expect the overhead of querying here to be minimal.
1974 queue_entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1975 queued = models.SpecialTask.objects.filter(
1976 host__id=self.host.id, is_active=False,
1977 is_complete=False, queue_entry=queue_entry)
1978 queued = queued.exclude(id=self.task.id)
1979 return queued.count() == 0
1980
1981
showard8fe93b52008-11-18 17:53:22 +00001982class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001983 TASK_TYPE = models.SpecialTask.Task.VERIFY
1984
1985
showardd1195652009-12-08 22:21:02 +00001986 def __init__(self, task):
1987 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001988 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001989
1990
jadmanski0afbb632008-06-06 21:10:57 +00001991 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001992 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001993
showardb18134f2009-03-20 20:52:18 +00001994 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001995 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001996 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1997 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001998
jamesren42318f72010-05-10 23:40:59 +00001999 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00002000 # and there's no need to keep records of other requests.
Dan Shi07e09af2013-04-12 09:31:29 -07002001 self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
2002 keep_last_one=True)
showard2fe3f1d2009-07-06 20:19:11 +00002003
mbligh36768f02008-02-22 18:28:33 +00002004
jadmanski0afbb632008-06-06 21:10:57 +00002005 def epilog(self):
2006 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002007 if self.success:
Alex Miller42437f92013-05-28 12:58:54 -07002008 if self._should_pending():
showard8cc058f2009-09-08 16:26:33 +00002009 self.queue_entry.on_pending()
2010 else:
2011 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00002012
2013
mbligh4608b002010-01-05 18:22:35 +00002014class CleanupTask(PreJobTask):
2015 # note this can also run post-job, but when it does, it's running standalone
2016 # against the host (not related to the job), so it's not considered a
2017 # PostJobTask
2018
2019 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2020
2021
2022 def __init__(self, task, recover_run_monitor=None):
2023 super(CleanupTask, self).__init__(task, ['--cleanup'])
2024 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2025
2026
2027 def prolog(self):
2028 super(CleanupTask, self).prolog()
2029 logging.info("starting cleanup task for host: %s", self.host.hostname)
2030 self.host.set_status(models.Host.Status.CLEANING)
2031 if self.queue_entry:
Dan Shi07e09af2013-04-12 09:31:29 -07002032 self.queue_entry.set_status(models.HostQueueEntry.Status.CLEANING)
mbligh4608b002010-01-05 18:22:35 +00002033
2034
2035 def _finish_epilog(self):
2036 if not self.queue_entry or not self.success:
2037 return
2038
2039 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2040 should_run_verify = (
2041 self.queue_entry.job.run_verify
2042 and self.host.protection != do_not_verify_protection)
2043 if should_run_verify:
2044 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
2045 models.SpecialTask.objects.create(
2046 host=models.Host.objects.get(id=self.host.id),
2047 queue_entry=entry,
2048 task=models.SpecialTask.Task.VERIFY)
2049 else:
Alex Miller42437f92013-05-28 12:58:54 -07002050 if self._should_pending():
2051 self.queue_entry.on_pending()
mbligh4608b002010-01-05 18:22:35 +00002052
2053
2054 def epilog(self):
2055 super(CleanupTask, self).epilog()
2056
2057 if self.success:
2058 self.host.update_field('dirty', 0)
2059 self.host.set_status(models.Host.Status.READY)
2060
2061 self._finish_epilog()
2062
2063
Dan Shi07e09af2013-04-12 09:31:29 -07002064class ResetTask(PreJobTask):
2065 """Task to reset a DUT, including cleanup and verify."""
2066 # note this can also run post-job, but when it does, it's running standalone
2067 # against the host (not related to the job), so it's not considered a
2068 # PostJobTask
2069
2070 TASK_TYPE = models.SpecialTask.Task.RESET
2071
2072
2073 def __init__(self, task, recover_run_monitor=None):
2074 super(ResetTask, self).__init__(task, ['--reset'])
2075 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2076
2077
2078 def prolog(self):
2079 super(ResetTask, self).prolog()
2080 logging.info('starting reset task for host: %s',
2081 self.host.hostname)
2082 self.host.set_status(models.Host.Status.RESETTING)
2083 if self.queue_entry:
2084 self.queue_entry.set_status(models.HostQueueEntry.Status.RESETTING)
2085
2086 # Delete any queued cleanups for this host.
2087 self.remove_special_tasks(models.SpecialTask.Task.CLEANUP,
2088 keep_last_one=False)
2089
2090 # Delete any queued reverifies for this host.
2091 self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
2092 keep_last_one=False)
2093
2094 # Only one reset is needed.
2095 self.remove_special_tasks(models.SpecialTask.Task.RESET,
2096 keep_last_one=True)
2097
2098
2099 def epilog(self):
2100 super(ResetTask, self).epilog()
2101
2102 if self.success:
2103 self.host.update_field('dirty', 0)
Dan Shi07e09af2013-04-12 09:31:29 -07002104
Alex Millerba076c52013-07-11 10:11:48 -07002105 if self._should_pending():
Dan Shi07e09af2013-04-12 09:31:29 -07002106 self.queue_entry.on_pending()
Alex Millerdc608d52013-07-30 14:26:21 -07002107 else:
2108 self.host.set_status(models.Host.Status.READY)
Dan Shi07e09af2013-04-12 09:31:29 -07002109
2110
Alex Millerdfff2fd2013-05-28 13:05:06 -07002111class ProvisionTask(PreJobTask):
2112 TASK_TYPE = models.SpecialTask.Task.PROVISION
2113
2114 def __init__(self, task):
2115 # Provisioning requires that we be associated with a job/queue entry
2116 assert task.queue_entry, "No HQE associated with provision task!"
2117 # task.queue_entry is an afe model HostQueueEntry object.
2118 # self.queue_entry is a scheduler models HostQueueEntry object, but
2119 # it gets constructed and assigned in __init__, so it's not available
2120 # yet. Therefore, we're stuck pulling labels off of the afe model
2121 # so that we can pass the --provision args into the __init__ call.
2122 labels = {x.name for x in task.queue_entry.job.dependency_labels.all()}
2123 _, provisionable = provision.filter_labels(labels)
2124 extra_command_args = ['--provision', ','.join(provisionable)]
2125 super(ProvisionTask, self).__init__(task, extra_command_args)
2126 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2127
2128
2129 def _command_line(self):
2130 # If we give queue_entry to _autoserv_command_line, then it will append
2131 # -c for this invocation if the queue_entry is a client side test. We
2132 # don't want that, as it messes with provisioning, so we just drop it
2133 # from the arguments here.
2134 # Note that we also don't verify job_repo_url as provisioining tasks are
2135 # required to stage whatever content we need, and the job itself will
2136 # force autotest to be staged if it isn't already.
2137 return _autoserv_command_line(self.host.hostname,
2138 self._extra_command_args)
2139
2140
2141 def prolog(self):
2142 super(ProvisionTask, self).prolog()
2143 # add check for previous provision task and abort if exist.
2144 logging.info("starting provision task for host: %s", self.host.hostname)
2145 self.queue_entry.set_status(
2146 models.HostQueueEntry.Status.PROVISIONING)
2147 self.host.set_status(models.Host.Status.PROVISIONING)
2148
2149
2150 def epilog(self):
Alex Millera4a78ef2013-09-03 21:23:05 -07002151 super(ProvisionTask, self).epilog()
Alex Millerdfff2fd2013-05-28 13:05:06 -07002152
Alex Millera4a78ef2013-09-03 21:23:05 -07002153 if self._should_pending():
Alex Millerdfff2fd2013-05-28 13:05:06 -07002154 self.queue_entry.on_pending()
2155 else:
2156 self.host.set_status(models.Host.Status.READY)
2157
2158
showarda9545c02009-12-18 22:44:26 +00002159class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
2160 """
2161 Common functionality for QueueTask and HostlessQueueTask
2162 """
2163 def __init__(self, queue_entries):
2164 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00002165 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00002166 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00002167
2168
showard73ec0442009-02-07 02:05:20 +00002169 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00002170 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00002171
2172
jamesrenc44ae992010-02-19 00:12:54 +00002173 def _write_control_file(self, execution_path):
2174 control_path = _drone_manager.attach_file_to_execution(
2175 execution_path, self.job.control_file)
2176 return control_path
2177
2178
Aviv Keshet308e7362013-05-21 14:43:16 -07002179 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00002180 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00002181 execution_path = self.queue_entries[0].execution_path()
2182 control_path = self._write_control_file(execution_path)
2183 hostnames = ','.join(entry.host.hostname
2184 for entry in self.queue_entries
2185 if not entry.is_hostless())
2186
2187 execution_tag = self.queue_entries[0].execution_tag()
2188 params = _autoserv_command_line(
2189 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07002190 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00002191 _drone_manager.absolute_path(control_path)],
2192 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07002193 if self.job.is_image_update_job():
2194 params += ['--image', self.job.update_image_path]
2195
jamesrenc44ae992010-02-19 00:12:54 +00002196 return params
showardd1195652009-12-08 22:21:02 +00002197
2198
2199 @property
2200 def num_processes(self):
2201 return len(self.queue_entries)
2202
2203
2204 @property
2205 def owner_username(self):
2206 return self.job.owner
2207
2208
2209 def _working_directory(self):
2210 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002211
2212
jadmanski0afbb632008-06-06 21:10:57 +00002213 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002214 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00002215 keyval_dict = self.job.keyval_dict()
2216 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00002217 group_name = self.queue_entries[0].get_group_name()
2218 if group_name:
2219 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002220 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002221 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002222 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002223 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002224
2225
showard35162b02009-03-03 02:17:30 +00002226 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002227 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002228 _drone_manager.write_lines_to_file(error_file_path,
2229 [_LOST_PROCESS_ERROR])
2230
2231
showardd3dc1992009-04-22 21:01:40 +00002232 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002233 if not self.monitor:
2234 return
2235
showardd9205182009-04-27 20:09:55 +00002236 self._write_job_finished()
2237
showard35162b02009-03-03 02:17:30 +00002238 if self.monitor.lost_process:
2239 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002240
jadmanskif7fa2cc2008-10-01 14:13:23 +00002241
showardcbd74612008-11-19 21:42:02 +00002242 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002243 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002244 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002245 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002246 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002247
2248
jadmanskif7fa2cc2008-10-01 14:13:23 +00002249 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002250 if not self.monitor or not self.monitor.has_process():
2251 return
2252
jadmanskif7fa2cc2008-10-01 14:13:23 +00002253 # build up sets of all the aborted_by and aborted_on values
2254 aborted_by, aborted_on = set(), set()
2255 for queue_entry in self.queue_entries:
2256 if queue_entry.aborted_by:
2257 aborted_by.add(queue_entry.aborted_by)
2258 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2259 aborted_on.add(t)
2260
2261 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00002262 # TODO(showard): this conditional is now obsolete, we just need to leave
2263 # it in temporarily for backwards compatibility over upgrades. delete
2264 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00002265 assert len(aborted_by) <= 1
2266 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002267 aborted_by_value = aborted_by.pop()
2268 aborted_on_value = max(aborted_on)
2269 else:
2270 aborted_by_value = 'autotest_system'
2271 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002272
showarda0382352009-02-11 23:36:43 +00002273 self._write_keyval_after_job("aborted_by", aborted_by_value)
2274 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002275
showardcbd74612008-11-19 21:42:02 +00002276 aborted_on_string = str(datetime.datetime.fromtimestamp(
2277 aborted_on_value))
2278 self._write_status_comment('Job aborted by %s on %s' %
2279 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002280
2281
jadmanski0afbb632008-06-06 21:10:57 +00002282 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002283 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002284 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002285 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002286
2287
jadmanski0afbb632008-06-06 21:10:57 +00002288 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002289 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002290 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002291
2292
2293class QueueTask(AbstractQueueTask):
2294 def __init__(self, queue_entries):
2295 super(QueueTask, self).__init__(queue_entries)
2296 self._set_ids(queue_entries=queue_entries)
2297
2298
2299 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002300 self._check_queue_entry_statuses(
2301 self.queue_entries,
2302 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2303 models.HostQueueEntry.Status.RUNNING),
2304 allowed_host_statuses=(models.Host.Status.PENDING,
2305 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002306
2307 super(QueueTask, self).prolog()
2308
2309 for queue_entry in self.queue_entries:
2310 self._write_host_keyvals(queue_entry.host)
2311 queue_entry.host.set_status(models.Host.Status.RUNNING)
2312 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00002313
2314
2315 def _finish_task(self):
2316 super(QueueTask, self)._finish_task()
2317
2318 for queue_entry in self.queue_entries:
2319 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00002320 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00002321
2322
Alex Miller9f01d5d2013-08-08 02:26:01 -07002323 def _command_line(self):
2324 invocation = super(QueueTask, self)._command_line()
2325 return invocation + ['--verify_job_repo_url']
2326
2327
Dan Shi1a189052013-10-28 14:41:35 -07002328class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00002329 def __init__(self, queue_entry):
2330 super(HostlessQueueTask, self).__init__([queue_entry])
2331 self.queue_entry_ids = [queue_entry.id]
2332
2333
2334 def prolog(self):
2335 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2336 super(HostlessQueueTask, self).prolog()
2337
2338
mbligh4608b002010-01-05 18:22:35 +00002339 def _finish_task(self):
2340 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002341 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002342
2343
showardd3dc1992009-04-22 21:01:40 +00002344class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002345 def __init__(self, queue_entries, log_file_name):
2346 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002347
showardd1195652009-12-08 22:21:02 +00002348 self.queue_entries = queue_entries
2349
showardd3dc1992009-04-22 21:01:40 +00002350 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002351 self._autoserv_monitor.attach_to_existing_process(
2352 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002353
showardd1195652009-12-08 22:21:02 +00002354
2355 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002356 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002357 return 'true'
2358 return self._generate_command(
2359 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002360
2361
2362 def _generate_command(self, results_dir):
2363 raise NotImplementedError('Subclasses must override this')
2364
2365
showardd1195652009-12-08 22:21:02 +00002366 @property
2367 def owner_username(self):
2368 return self.queue_entries[0].job.owner
2369
2370
2371 def _working_directory(self):
2372 return self._get_consistent_execution_path(self.queue_entries)
2373
2374
2375 def _paired_with_monitor(self):
2376 return self._autoserv_monitor
2377
2378
showardd3dc1992009-04-22 21:01:40 +00002379 def _job_was_aborted(self):
2380 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002381 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002382 queue_entry.update_from_database()
2383 if was_aborted is None: # first queue entry
2384 was_aborted = bool(queue_entry.aborted)
2385 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00002386 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
2387 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00002388 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00002389 'Inconsistent abort state',
2390 'Queue entries have inconsistent abort state:\n' +
2391 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00002392 # don't crash here, just assume true
2393 return True
2394 return was_aborted
2395
2396
showardd1195652009-12-08 22:21:02 +00002397 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002398 if self._job_was_aborted():
2399 return models.HostQueueEntry.Status.ABORTED
2400
2401 # we'll use a PidfileRunMonitor to read the autoserv exit status
2402 if self._autoserv_monitor.exit_code() == 0:
2403 return models.HostQueueEntry.Status.COMPLETED
2404 return models.HostQueueEntry.Status.FAILED
2405
2406
showardd3dc1992009-04-22 21:01:40 +00002407 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002408 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002409 queue_entry.set_status(status)
2410
2411
2412 def abort(self):
2413 # override AgentTask.abort() to avoid killing the process and ending
2414 # the task. post-job tasks continue when the job is aborted.
2415 pass
2416
2417
mbligh4608b002010-01-05 18:22:35 +00002418 def _pidfile_label(self):
2419 # '.autoserv_execute' -> 'autoserv'
2420 return self._pidfile_name()[1:-len('_execute')]
2421
2422
showard9bb960b2009-11-19 01:02:11 +00002423class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002424 """
2425 Task responsible for
2426 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2427 * copying logs to the results repository
2428 * spawning CleanupTasks for hosts, if necessary
2429 * spawning a FinalReparseTask for the job
2430 """
showardd1195652009-12-08 22:21:02 +00002431 def __init__(self, queue_entries, recover_run_monitor=None):
2432 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002433 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002434 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002435 self._set_ids(queue_entries=queue_entries)
2436
2437
Aviv Keshet308e7362013-05-21 14:43:16 -07002438 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd3dc1992009-04-22 21:01:40 +00002439 def _generate_command(self, results_dir):
2440 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002441 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002442 return [_autoserv_path , '-p',
2443 '--pidfile-label=%s' % self._pidfile_label(),
2444 '--use-existing-results', '--collect-crashinfo',
2445 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002446
2447
showardd1195652009-12-08 22:21:02 +00002448 @property
2449 def num_processes(self):
2450 return len(self.queue_entries)
2451
2452
2453 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002454 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002455
2456
showardd3dc1992009-04-22 21:01:40 +00002457 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002458 self._check_queue_entry_statuses(
2459 self.queue_entries,
2460 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2461 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002462
showardd3dc1992009-04-22 21:01:40 +00002463 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002464
2465
showardd3dc1992009-04-22 21:01:40 +00002466 def epilog(self):
2467 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002468 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002469 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002470
showard9bb960b2009-11-19 01:02:11 +00002471
2472 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002473 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002474 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002475 models.HostQueueEntry.Status.COMPLETED)
2476 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2477 else:
2478 final_success = False
2479 num_tests_failed = 0
showard9bb960b2009-11-19 01:02:11 +00002480 reboot_after = self._job.reboot_after
2481 do_reboot = (
2482 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002483 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002484 or reboot_after == model_attributes.RebootAfter.ALWAYS
2485 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
Dan Shi07e09af2013-04-12 09:31:29 -07002486 and final_success and num_tests_failed == 0)
2487 or num_tests_failed > 0)
showard9bb960b2009-11-19 01:02:11 +00002488
showardd1195652009-12-08 22:21:02 +00002489 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002490 if do_reboot:
2491 # don't pass the queue entry to the CleanupTask. if the cleanup
2492 # fails, the job doesn't care -- it's over.
2493 models.SpecialTask.objects.create(
2494 host=models.Host.objects.get(id=queue_entry.host.id),
2495 task=models.SpecialTask.Task.CLEANUP,
2496 requested_by=self._job.owner_model())
2497 else:
2498 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002499
2500
showard0bbfc212009-04-29 21:06:13 +00002501 def run(self):
showard597bfd32009-05-08 18:22:50 +00002502 autoserv_exit_code = self._autoserv_monitor.exit_code()
2503 # only run if Autoserv exited due to some signal. if we have no exit
2504 # code, assume something bad (and signal-like) happened.
2505 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002506 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002507 else:
2508 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002509
2510
Dan Shi1a189052013-10-28 14:41:35 -07002511class SelfThrottledPostJobTask(PostJobTask):
2512 """
2513 Special AgentTask subclass that maintains its own global process limit.
2514 """
2515 _num_running_processes = 0
2516 # Last known limit of max processes, used to check whether
2517 # max processes config has been changed.
2518 _last_known_max_processes = 0
2519 # Whether an email should be sent to notifiy process limit being hit.
2520 _notification_on = True
2521 # Once process limit is hit, an email will be sent.
2522 # To prevent spams, do not send another email until
2523 # it drops to lower than the following level.
2524 REVIVE_NOTIFICATION_THRESHOLD = 0.80
2525
2526
2527 @classmethod
2528 def _increment_running_processes(cls):
2529 cls._num_running_processes += 1
2530 stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__,
2531 cls._num_running_processes)
2532
2533
2534 @classmethod
2535 def _decrement_running_processes(cls):
2536 cls._num_running_processes -= 1
2537 stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__,
2538 cls._num_running_processes)
2539
2540
2541 @classmethod
2542 def _max_processes(cls):
2543 raise NotImplementedError
2544
2545
2546 @classmethod
2547 def _can_run_new_process(cls):
2548 return cls._num_running_processes < cls._max_processes()
2549
2550
2551 def _process_started(self):
2552 return bool(self.monitor)
2553
2554
2555 def tick(self):
2556 # override tick to keep trying to start until the process count goes
2557 # down and we can, at which point we revert to default behavior
2558 if self._process_started():
2559 super(SelfThrottledPostJobTask, self).tick()
2560 else:
2561 self._try_starting_process()
2562
2563
2564 def run(self):
2565 # override run() to not actually run unless we can
2566 self._try_starting_process()
2567
2568
2569 @classmethod
2570 def _notify_process_limit_hit(cls):
2571 """Send an email to notify that process limit is hit."""
2572 if cls._notification_on:
2573 subject = '%s: hitting max process limit.' % cls.__name__
2574 message = ('Running processes/Max processes: %d/%d'
2575 % (cls._num_running_processes, cls._max_processes()))
2576 email_manager.manager.enqueue_notify_email(subject, message)
2577 cls._notification_on = False
2578
2579
2580 @classmethod
2581 def _reset_notification_switch_if_necessary(cls):
2582 """Reset _notification_on if necessary.
2583
2584 Set _notification_on to True on the following cases:
2585 1) If the limit of max processes configuration changes;
2586 2) If _notification_on is False and the number of running processes
2587 drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD.
2588
2589 """
2590 if cls._last_known_max_processes != cls._max_processes():
2591 cls._notification_on = True
2592 cls._last_known_max_processes = cls._max_processes()
2593 return
2594 percentage = float(cls._num_running_processes) / cls._max_processes()
2595 if (not cls._notification_on and
2596 percentage < cls.REVIVE_NOTIFICATION_THRESHOLD):
2597 cls._notification_on = True
2598
2599
2600 def _try_starting_process(self):
2601 self._reset_notification_switch_if_necessary()
2602 if not self._can_run_new_process():
2603 self._notify_process_limit_hit()
2604 return
2605
2606 # actually run the command
2607 super(SelfThrottledPostJobTask, self).run()
2608 if self._process_started():
2609 self._increment_running_processes()
2610
2611
2612 def finished(self, success):
2613 super(SelfThrottledPostJobTask, self).finished(success)
2614 if self._process_started():
2615 self._decrement_running_processes()
2616
2617
2618class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002619 def __init__(self, queue_entries):
2620 super(FinalReparseTask, self).__init__(queue_entries,
2621 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002622 # don't use _set_ids, since we don't want to set the host_ids
2623 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002624
2625
2626 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002627 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002628 results_dir]
2629
2630
2631 @property
2632 def num_processes(self):
2633 return 0 # don't include parser processes in accounting
2634
2635
2636 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002637 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002638
2639
showard97aed502008-11-04 02:01:24 +00002640 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002641 def _max_processes(cls):
2642 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002643
2644
2645 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002646 self._check_queue_entry_statuses(
2647 self.queue_entries,
2648 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002649
showard97aed502008-11-04 02:01:24 +00002650 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002651
2652
2653 def epilog(self):
2654 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002655 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002656
2657
Dan Shi1a189052013-10-28 14:41:35 -07002658class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002659 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2660
mbligh4608b002010-01-05 18:22:35 +00002661 def __init__(self, queue_entries):
2662 super(ArchiveResultsTask, self).__init__(queue_entries,
2663 log_file_name='.archiving.log')
2664 # don't use _set_ids, since we don't want to set the host_ids
2665 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002666
2667
mbligh4608b002010-01-05 18:22:35 +00002668 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002669 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002670
2671
Aviv Keshet308e7362013-05-21 14:43:16 -07002672 # TODO: Refactor into autoserv_utils. crbug.com/243090
mbligh4608b002010-01-05 18:22:35 +00002673 def _generate_command(self, results_dir):
2674 return [_autoserv_path , '-p',
2675 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002676 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002677 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2678 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002679
2680
mbligh4608b002010-01-05 18:22:35 +00002681 @classmethod
2682 def _max_processes(cls):
2683 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002684
2685
2686 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002687 self._check_queue_entry_statuses(
2688 self.queue_entries,
2689 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2690
2691 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002692
2693
mbligh4608b002010-01-05 18:22:35 +00002694 def epilog(self):
2695 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002696 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002697 failed_file = os.path.join(self._working_directory(),
2698 self._ARCHIVING_FAILED_FILE)
2699 paired_process = self._paired_with_monitor().get_process()
2700 _drone_manager.write_lines_to_file(
2701 failed_file, ['Archiving failed with exit code %s'
2702 % self.monitor.exit_code()],
2703 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002704 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002705
2706
mbligh36768f02008-02-22 18:28:33 +00002707if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002708 main()