blob: 6f9999fffbf6260f5d31bd47626669f66edc3fcf [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
mbligh36768f02008-02-22 18:28:33 +00008
showard402934a2009-12-21 22:20:47 +00009import common
Aviv Keshet225bdfe2013-03-05 10:10:08 -080010import datetime, optparse, os, signal
11import sys, time, traceback, urllib
12import logging, gc
showard402934a2009-12-21 22:20:47 +000013
showard043c62a2009-06-10 19:48:57 +000014from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000015from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000016
17import django.db
18
showard136e6dc2009-06-10 19:38:49 +000019from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000020from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000021from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000022from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
jamesrendd855242010-03-02 22:23:44 +000023from autotest_lib.frontend.afe import model_attributes
showard170873e2009-01-07 00:22:26 +000024from autotest_lib.scheduler import drone_manager, drones, email_manager
Dale Curtisaa513362011-03-01 17:27:44 -080025from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000026from autotest_lib.scheduler import status_server, scheduler_config
jamesrenc44ae992010-02-19 00:12:54 +000027from autotest_lib.scheduler import scheduler_models
showard549afad2009-08-20 23:33:36 +000028BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
29PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000030
mbligh36768f02008-02-22 18:28:33 +000031RESULTS_DIR = '.'
32AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000033DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000034AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
35
36if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000037 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000038AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
39AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
40
41if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000042 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000043
showard35162b02009-03-03 02:17:30 +000044# error message to leave in results dir when an autoserv process disappears
45# mysteriously
46_LOST_PROCESS_ERROR = """\
47Autoserv failed abnormally during execution for this job, probably due to a
48system error on the Autotest server. Full results may not be available. Sorry.
49"""
50
mbligh6f8bab42008-02-29 22:45:14 +000051_db = None
mbligh36768f02008-02-22 18:28:33 +000052_shutdown = False
showard170873e2009-01-07 00:22:26 +000053_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000054_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000055_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000056
Eric Lie0493a42010-11-15 13:05:43 -080057def _parser_path_default(install_dir):
58 return os.path.join(install_dir, 'tko', 'parse')
59_parser_path_func = utils.import_site_function(
60 __file__, 'autotest_lib.scheduler.site_monitor_db',
61 'parser_path', _parser_path_default)
62_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
63
mbligh36768f02008-02-22 18:28:33 +000064
showardec6a3b92009-09-25 20:29:13 +000065def _get_pidfile_timeout_secs():
66 """@returns How long to wait for autoserv to write pidfile."""
67 pidfile_timeout_mins = global_config.global_config.get_config_value(
68 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
69 return pidfile_timeout_mins * 60
70
71
mbligh83c1e9e2009-05-01 23:10:41 +000072def _site_init_monitor_db_dummy():
73 return {}
74
75
jamesren76fcf192010-04-21 20:39:50 +000076def _verify_default_drone_set_exists():
77 if (models.DroneSet.drone_sets_enabled() and
78 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080079 raise host_scheduler.SchedulerError(
80 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000081
82
83def _sanity_check():
84 """Make sure the configs are consistent before starting the scheduler"""
85 _verify_default_drone_set_exists()
86
87
mbligh36768f02008-02-22 18:28:33 +000088def main():
showard27f33872009-04-07 18:20:53 +000089 try:
showard549afad2009-08-20 23:33:36 +000090 try:
91 main_without_exception_handling()
92 except SystemExit:
93 raise
94 except:
95 logging.exception('Exception escaping in monitor_db')
96 raise
97 finally:
98 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000099
100
101def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000102 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000103
showard136e6dc2009-06-10 19:38:49 +0000104 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000105 parser = optparse.OptionParser(usage)
106 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
107 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000108 parser.add_option('--test', help='Indicate that scheduler is under ' +
109 'test and should use dummy autoserv and no parsing',
110 action='store_true')
111 (options, args) = parser.parse_args()
112 if len(args) != 1:
113 parser.print_usage()
114 return
mbligh36768f02008-02-22 18:28:33 +0000115
showard5613c662009-06-08 23:30:33 +0000116 scheduler_enabled = global_config.global_config.get_config_value(
117 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
118
119 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800120 logging.error("Scheduler not enabled, set enable_scheduler to true in "
121 "the global_config's SCHEDULER section to enable it. "
122 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000123 sys.exit(1)
124
jadmanski0afbb632008-06-06 21:10:57 +0000125 global RESULTS_DIR
126 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000127
mbligh83c1e9e2009-05-01 23:10:41 +0000128 site_init = utils.import_site_function(__file__,
129 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
130 _site_init_monitor_db_dummy)
131 site_init()
132
showardcca334f2009-03-12 20:38:34 +0000133 # Change the cwd while running to avoid issues incase we were launched from
134 # somewhere odd (such as a random NFS home directory of the person running
135 # sudo to launch us as the appropriate user).
136 os.chdir(RESULTS_DIR)
137
jamesrenc7d387e2010-08-10 21:48:30 +0000138 # This is helpful for debugging why stuff a scheduler launches is
139 # misbehaving.
140 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000141
jadmanski0afbb632008-06-06 21:10:57 +0000142 if options.test:
143 global _autoserv_path
144 _autoserv_path = 'autoserv_dummy'
145 global _testing_mode
146 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000147
jamesrenc44ae992010-02-19 00:12:54 +0000148 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000149 server.start()
150
jadmanski0afbb632008-06-06 21:10:57 +0000151 try:
jamesrenc44ae992010-02-19 00:12:54 +0000152 initialize()
showardc5afc462009-01-13 00:09:39 +0000153 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000154 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000155
Eric Lia82dc352011-02-23 13:15:52 -0800156 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000157 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000158 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000159 except:
showard170873e2009-01-07 00:22:26 +0000160 email_manager.manager.log_stacktrace(
161 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000162
showard170873e2009-01-07 00:22:26 +0000163 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000164 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000165 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000166 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000167
168
showard136e6dc2009-06-10 19:38:49 +0000169def setup_logging():
170 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
171 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
172 logging_manager.configure_logging(
173 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
174 logfile_name=log_name)
175
176
mbligh36768f02008-02-22 18:28:33 +0000177def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000178 global _shutdown
179 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000180 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000181
182
jamesrenc44ae992010-02-19 00:12:54 +0000183def initialize():
showardb18134f2009-03-20 20:52:18 +0000184 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
185 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000186
showard8de37132009-08-31 18:33:08 +0000187 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000188 logging.critical("monitor_db already running, aborting!")
189 sys.exit(1)
190 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000191
showardb1e51872008-10-07 11:08:18 +0000192 if _testing_mode:
193 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000194 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000195
jadmanski0afbb632008-06-06 21:10:57 +0000196 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
197 global _db
showard170873e2009-01-07 00:22:26 +0000198 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000199 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000200
showardfa8629c2008-11-04 16:51:23 +0000201 # ensure Django connection is in autocommit
202 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000203 # bypass the readonly connection
204 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000205
showardb18134f2009-03-20 20:52:18 +0000206 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000207 signal.signal(signal.SIGINT, handle_sigint)
208
jamesrenc44ae992010-02-19 00:12:54 +0000209 initialize_globals()
210 scheduler_models.initialize()
211
showardd1ee1dd2009-01-07 21:33:08 +0000212 drones = global_config.global_config.get_config_value(
213 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
214 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000215 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000216 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000217 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
218
showardb18134f2009-03-20 20:52:18 +0000219 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000220
221
jamesrenc44ae992010-02-19 00:12:54 +0000222def initialize_globals():
223 global _drone_manager
224 _drone_manager = drone_manager.instance()
225
226
showarded2afea2009-07-07 20:54:07 +0000227def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
228 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000229 """
230 @returns The autoserv command line as a list of executable + parameters.
231
232 @param machines - string - A machine or comma separated list of machines
233 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000234 @param extra_args - list - Additional arguments to pass to autoserv.
235 @param job - Job object - If supplied, -u owner and -l name parameters
236 will be added.
237 @param queue_entry - A HostQueueEntry object - If supplied and no Job
238 object was supplied, this will be used to lookup the Job object.
239 """
showarda9545c02009-12-18 22:44:26 +0000240 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000241 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000242 if machines:
243 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000244 if job or queue_entry:
245 if not job:
246 job = queue_entry.job
247 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000248 if verbose:
249 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000250 return autoserv_argv + extra_args
251
252
Simran Basia858a232012-08-21 11:04:37 -0700253class BaseDispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000254 def __init__(self):
255 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000256 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800257 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000258 user_cleanup_time = scheduler_config.config.clean_interval
259 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
260 _db, user_cleanup_time)
261 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000262 self._host_agents = {}
263 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000264 self._tick_count = 0
265 self._last_garbage_stats_time = time.time()
266 self._seconds_between_garbage_stats = 60 * (
267 global_config.global_config.get_config_value(
268 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700269 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700270 self._tick_debug = global_config.global_config.get_config_value(
271 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
272 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700273 self._extra_debugging = global_config.global_config.get_config_value(
274 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
275 default=False)
mbligh36768f02008-02-22 18:28:33 +0000276
mbligh36768f02008-02-22 18:28:33 +0000277
showard915958d2009-04-22 21:00:58 +0000278 def initialize(self, recover_hosts=True):
279 self._periodic_cleanup.initialize()
280 self._24hr_upkeep.initialize()
281
jadmanski0afbb632008-06-06 21:10:57 +0000282 # always recover processes
283 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000284
jadmanski0afbb632008-06-06 21:10:57 +0000285 if recover_hosts:
286 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000287
jamesrenc44ae992010-02-19 00:12:54 +0000288 self._host_scheduler.recovery_on_startup()
289
mbligh36768f02008-02-22 18:28:33 +0000290
Simran Basi0ec94dd2012-08-28 09:50:10 -0700291 def _log_tick_msg(self, msg):
292 if self._tick_debug:
293 logging.debug(msg)
294
295
Simran Basidef92872012-09-20 13:34:34 -0700296 def _log_extra_msg(self, msg):
297 if self._extra_debugging:
298 logging.debug(msg)
299
300
jadmanski0afbb632008-06-06 21:10:57 +0000301 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700302 """
303 This is an altered version of tick() where we keep track of when each
304 major step begins so we can try to figure out where we are using most
305 of the tick time.
306 """
Simran Basi3f6717d2012-09-13 15:21:22 -0700307 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000308 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700309 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000310 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700311 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000312 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700313 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000314 self._find_aborting()
Simran Basi3f6717d2012-09-13 15:21:22 -0700315 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000316 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700317 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000318 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700319 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000320 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700321 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000322 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700323 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000324 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700325 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000326 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700327 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000328 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700329 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000330 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700331 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700332 'email_manager.manager.send_queued_emails().')
showard170873e2009-01-07 00:22:26 +0000333 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700334 self._log_tick_msg('Calling django.db.reset_queries().')
showard402934a2009-12-21 22:20:47 +0000335 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000336 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000337
showard97aed502008-11-04 02:01:24 +0000338
mblighf3294cc2009-04-08 21:17:38 +0000339 def _run_cleanup(self):
340 self._periodic_cleanup.run_cleanup_maybe()
341 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000342
mbligh36768f02008-02-22 18:28:33 +0000343
showardf13a9e22009-12-18 22:54:09 +0000344 def _garbage_collection(self):
345 threshold_time = time.time() - self._seconds_between_garbage_stats
346 if threshold_time < self._last_garbage_stats_time:
347 # Don't generate these reports very often.
348 return
349
350 self._last_garbage_stats_time = time.time()
351 # Force a full level 0 collection (because we can, it doesn't hurt
352 # at this interval).
353 gc.collect()
354 logging.info('Logging garbage collector stats on tick %d.',
355 self._tick_count)
356 gc_stats._log_garbage_collector_stats()
357
358
showard170873e2009-01-07 00:22:26 +0000359 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
360 for object_id in object_ids:
361 agent_dict.setdefault(object_id, set()).add(agent)
362
363
364 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
365 for object_id in object_ids:
366 assert object_id in agent_dict
367 agent_dict[object_id].remove(agent)
368
369
showardd1195652009-12-08 22:21:02 +0000370 def add_agent_task(self, agent_task):
371 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000372 self._agents.append(agent)
373 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000374 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
375 self._register_agent_for_ids(self._queue_entry_agents,
376 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000377
showard170873e2009-01-07 00:22:26 +0000378
379 def get_agents_for_entry(self, queue_entry):
380 """
381 Find agents corresponding to the specified queue_entry.
382 """
showardd3dc1992009-04-22 21:01:40 +0000383 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000384
385
386 def host_has_agent(self, host):
387 """
388 Determine if there is currently an Agent present using this host.
389 """
390 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000391
392
jadmanski0afbb632008-06-06 21:10:57 +0000393 def remove_agent(self, agent):
394 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000395 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
396 agent)
397 self._unregister_agent_for_ids(self._queue_entry_agents,
398 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000399
400
showard8cc058f2009-09-08 16:26:33 +0000401 def _host_has_scheduled_special_task(self, host):
402 return bool(models.SpecialTask.objects.filter(host__id=host.id,
403 is_active=False,
404 is_complete=False))
405
406
jadmanski0afbb632008-06-06 21:10:57 +0000407 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000408 agent_tasks = self._create_recovery_agent_tasks()
409 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000410 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000411 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000412 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000413 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000414 self._reverify_remaining_hosts()
415 # reinitialize drones after killing orphaned processes, since they can
416 # leave around files when they die
417 _drone_manager.execute_actions()
418 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000419
showard170873e2009-01-07 00:22:26 +0000420
showardd1195652009-12-08 22:21:02 +0000421 def _create_recovery_agent_tasks(self):
422 return (self._get_queue_entry_agent_tasks()
423 + self._get_special_task_agent_tasks(is_active=True))
424
425
426 def _get_queue_entry_agent_tasks(self):
427 # host queue entry statuses handled directly by AgentTasks (Verifying is
428 # handled through SpecialTasks, so is not listed here)
429 statuses = (models.HostQueueEntry.Status.STARTING,
430 models.HostQueueEntry.Status.RUNNING,
431 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000432 models.HostQueueEntry.Status.PARSING,
433 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000434 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000435 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000436 where='status IN (%s)' % status_list)
437
438 agent_tasks = []
439 used_queue_entries = set()
440 for entry in queue_entries:
441 if self.get_agents_for_entry(entry):
442 # already being handled
443 continue
444 if entry in used_queue_entries:
445 # already picked up by a synchronous job
446 continue
447 agent_task = self._get_agent_task_for_queue_entry(entry)
448 agent_tasks.append(agent_task)
449 used_queue_entries.update(agent_task.queue_entries)
450 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000451
452
showardd1195652009-12-08 22:21:02 +0000453 def _get_special_task_agent_tasks(self, is_active=False):
454 special_tasks = models.SpecialTask.objects.filter(
455 is_active=is_active, is_complete=False)
456 return [self._get_agent_task_for_special_task(task)
457 for task in special_tasks]
458
459
460 def _get_agent_task_for_queue_entry(self, queue_entry):
461 """
462 Construct an AgentTask instance for the given active HostQueueEntry,
463 if one can currently run it.
464 @param queue_entry: a HostQueueEntry
465 @returns an AgentTask to run the queue entry
466 """
467 task_entries = queue_entry.job.get_group_entries(queue_entry)
468 self._check_for_duplicate_host_entries(task_entries)
469
470 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
471 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000472 if queue_entry.is_hostless():
473 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000474 return QueueTask(queue_entries=task_entries)
475 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
476 return GatherLogsTask(queue_entries=task_entries)
477 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
478 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000479 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
480 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000481
Dale Curtisaa513362011-03-01 17:27:44 -0800482 raise host_scheduler.SchedulerError(
483 '_get_agent_task_for_queue_entry got entry with '
484 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000485
486
487 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000488 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
489 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000490 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000491 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000492 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000493 if using_host:
showardd1195652009-12-08 22:21:02 +0000494 self._assert_host_has_no_agent(task_entry)
495
496
497 def _assert_host_has_no_agent(self, entry):
498 """
499 @param entry: a HostQueueEntry or a SpecialTask
500 """
501 if self.host_has_agent(entry.host):
502 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800503 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000504 'While scheduling %s, host %s already has a host agent %s'
505 % (entry, entry.host, agent.task))
506
507
508 def _get_agent_task_for_special_task(self, special_task):
509 """
510 Construct an AgentTask class to run the given SpecialTask and add it
511 to this dispatcher.
512 @param special_task: a models.SpecialTask instance
513 @returns an AgentTask to run this SpecialTask
514 """
515 self._assert_host_has_no_agent(special_task)
516
517 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
518 for agent_task_class in special_agent_task_classes:
519 if agent_task_class.TASK_TYPE == special_task.task:
520 return agent_task_class(task=special_task)
521
Dale Curtisaa513362011-03-01 17:27:44 -0800522 raise host_scheduler.SchedulerError(
523 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000524
525
526 def _register_pidfiles(self, agent_tasks):
527 for agent_task in agent_tasks:
528 agent_task.register_necessary_pidfiles()
529
530
531 def _recover_tasks(self, agent_tasks):
532 orphans = _drone_manager.get_orphaned_autoserv_processes()
533
534 for agent_task in agent_tasks:
535 agent_task.recover()
536 if agent_task.monitor and agent_task.monitor.has_process():
537 orphans.discard(agent_task.monitor.get_process())
538 self.add_agent_task(agent_task)
539
540 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000541
542
showard8cc058f2009-09-08 16:26:33 +0000543 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000544 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
545 % status):
showard0db3d432009-10-12 20:29:15 +0000546 if entry.status == status and not self.get_agents_for_entry(entry):
547 # The status can change during iteration, e.g., if job.run()
548 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000549 yield entry
550
551
showard6878e8b2009-07-20 22:37:45 +0000552 def _check_for_remaining_orphan_processes(self, orphans):
553 if not orphans:
554 return
555 subject = 'Unrecovered orphan autoserv processes remain'
556 message = '\n'.join(str(process) for process in orphans)
557 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000558
559 die_on_orphans = global_config.global_config.get_config_value(
560 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
561
562 if die_on_orphans:
563 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000564
showard170873e2009-01-07 00:22:26 +0000565
showard8cc058f2009-09-08 16:26:33 +0000566 def _recover_pending_entries(self):
567 for entry in self._get_unassigned_entries(
568 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000569 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000570 entry.on_pending()
571
572
showardb8900452009-10-12 20:31:01 +0000573 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000574 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000575 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
576 unrecovered_hqes = []
577 for queue_entry in queue_entries:
578 special_tasks = models.SpecialTask.objects.filter(
579 task__in=(models.SpecialTask.Task.CLEANUP,
580 models.SpecialTask.Task.VERIFY),
581 queue_entry__id=queue_entry.id,
582 is_complete=False)
583 if special_tasks.count() == 0:
584 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000585
showardb8900452009-10-12 20:31:01 +0000586 if unrecovered_hqes:
587 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800588 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000589 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000590 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000591
592
showard65db3932009-10-28 19:54:35 +0000593 def _get_prioritized_special_tasks(self):
594 """
595 Returns all queued SpecialTasks prioritized for repair first, then
596 cleanup, then verify.
597 """
598 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
599 is_complete=False,
600 host__locked=False)
601 # exclude hosts with active queue entries unless the SpecialTask is for
602 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000603 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000604 queued_tasks, 'afe_host_queue_entries', 'host_id',
605 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000606 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000607 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000608 where=['(afe_host_queue_entries.id IS NULL OR '
609 'afe_host_queue_entries.id = '
610 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000611
showard65db3932009-10-28 19:54:35 +0000612 # reorder tasks by priority
613 task_priority_order = [models.SpecialTask.Task.REPAIR,
614 models.SpecialTask.Task.CLEANUP,
615 models.SpecialTask.Task.VERIFY]
616 def task_priority_key(task):
617 return task_priority_order.index(task.task)
618 return sorted(queued_tasks, key=task_priority_key)
619
620
showard65db3932009-10-28 19:54:35 +0000621 def _schedule_special_tasks(self):
622 """
623 Execute queued SpecialTasks that are ready to run on idle hosts.
624 """
625 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000626 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000627 continue
showardd1195652009-12-08 22:21:02 +0000628 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000629
630
showard170873e2009-01-07 00:22:26 +0000631 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000632 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000633 # should never happen
showarded2afea2009-07-07 20:54:07 +0000634 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000635 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000636 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000637 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000638 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000639
640
jadmanski0afbb632008-06-06 21:10:57 +0000641 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000642 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700643 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000644 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000645 if self.host_has_agent(host):
646 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000647 continue
showard8cc058f2009-09-08 16:26:33 +0000648 if self._host_has_scheduled_special_task(host):
649 # host will have a special task scheduled on the next cycle
650 continue
showard170873e2009-01-07 00:22:26 +0000651 if print_message:
showardb18134f2009-03-20 20:52:18 +0000652 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000653 models.SpecialTask.objects.create(
654 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000655 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000656
657
jadmanski0afbb632008-06-06 21:10:57 +0000658 def _recover_hosts(self):
659 # recover "Repair Failed" hosts
660 message = 'Reverifying dead host %s'
661 self._reverify_hosts_where("status = 'Repair Failed'",
662 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000663
664
showard04c82c52008-05-29 19:38:12 +0000665
showardb95b1bd2008-08-15 18:11:04 +0000666 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000667 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +0000668 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000669 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000670 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +0000671 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000672
673
showard89f84db2009-03-12 20:39:13 +0000674 def _refresh_pending_queue_entries(self):
675 """
676 Lookup the pending HostQueueEntries and call our HostScheduler
677 refresh() method given that list. Return the list.
678
679 @returns A list of pending HostQueueEntries sorted in priority order.
680 """
showard63a34772008-08-18 19:32:50 +0000681 queue_entries = self._get_pending_queue_entries()
682 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000683 return []
showardb95b1bd2008-08-15 18:11:04 +0000684
showard63a34772008-08-18 19:32:50 +0000685 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000686
showard89f84db2009-03-12 20:39:13 +0000687 return queue_entries
688
689
690 def _schedule_atomic_group(self, queue_entry):
691 """
692 Schedule the given queue_entry on an atomic group of hosts.
693
694 Returns immediately if there are insufficient available hosts.
695
696 Creates new HostQueueEntries based off of queue_entry for the
697 scheduled hosts and starts them all running.
698 """
699 # This is a virtual host queue entry representing an entire
700 # atomic group, find a group and schedule their hosts.
701 group_hosts = self._host_scheduler.find_eligible_atomic_group(
702 queue_entry)
703 if not group_hosts:
704 return
showardcbe6f942009-06-17 19:33:49 +0000705
706 logging.info('Expanding atomic group entry %s with hosts %s',
707 queue_entry,
708 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000709
showard89f84db2009-03-12 20:39:13 +0000710 for assigned_host in group_hosts[1:]:
711 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000712 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000713 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000714 new_hqe.set_host(assigned_host)
715 self._run_queue_entry(new_hqe)
716
717 # The first assigned host uses the original HostQueueEntry
718 queue_entry.set_host(group_hosts[0])
719 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000720
721
showarda9545c02009-12-18 22:44:26 +0000722 def _schedule_hostless_job(self, queue_entry):
723 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000724 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000725
726
showard89f84db2009-03-12 20:39:13 +0000727 def _schedule_new_jobs(self):
728 queue_entries = self._refresh_pending_queue_entries()
729 if not queue_entries:
730 return
731
Simran Basi3f6717d2012-09-13 15:21:22 -0700732 logging.debug('Processing %d queue_entries', len(queue_entries))
showard63a34772008-08-18 19:32:50 +0000733 for queue_entry in queue_entries:
Simran Basidef92872012-09-20 13:34:34 -0700734 self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
showarde55955f2009-10-07 20:48:58 +0000735 is_unassigned_atomic_group = (
736 queue_entry.atomic_group_id is not None
737 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +0000738
739 if queue_entry.is_hostless():
Simran Basidef92872012-09-20 13:34:34 -0700740 self._log_extra_msg('Scheduling hostless job.')
showarda9545c02009-12-18 22:44:26 +0000741 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000742 elif is_unassigned_atomic_group:
743 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +0000744 else:
jamesren883492a2010-02-12 00:45:18 +0000745 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +0000746 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +0000747 assert assigned_host.id == queue_entry.host_id
748 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000749
750
showard8cc058f2009-09-08 16:26:33 +0000751 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +0000752 for agent_task in self._get_queue_entry_agent_tasks():
753 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000754
755
756 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000757 for entry in scheduler_models.HostQueueEntry.fetch(
758 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000759 task = entry.job.schedule_delayed_callback_task(entry)
760 if task:
showardd1195652009-12-08 22:21:02 +0000761 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000762
763
jamesren883492a2010-02-12 00:45:18 +0000764 def _run_queue_entry(self, queue_entry):
Simran Basidef92872012-09-20 13:34:34 -0700765 self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
766 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000767 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000768
769
jadmanski0afbb632008-06-06 21:10:57 +0000770 def _find_aborting(self):
jamesrene7c65cb2010-06-08 20:38:10 +0000771 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000772 for entry in scheduler_models.HostQueueEntry.fetch(
773 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +0000774 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +0000775 for agent in self.get_agents_for_entry(entry):
776 agent.abort()
777 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000778 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700779 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000780 for job in jobs_to_stop:
781 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000782
783
showard324bf812009-01-20 23:23:38 +0000784 def _can_start_agent(self, agent, num_started_this_cycle,
785 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000786 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000787 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000788 return True
789 # don't allow any nonzero-process agents to run after we've reached a
790 # limit (this avoids starvation of many-process agents)
791 if have_reached_limit:
792 return False
793 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000794 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000795 agent.task.owner_username,
796 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000797 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000798 return False
799 # if a single agent exceeds the per-cycle throttling, still allow it to
800 # run when it's the first agent in the cycle
801 if num_started_this_cycle == 0:
802 return True
803 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000804 if (num_started_this_cycle + agent.task.num_processes >
805 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000806 return False
807 return True
808
809
jadmanski0afbb632008-06-06 21:10:57 +0000810 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000811 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000812 have_reached_limit = False
813 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700814 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000815 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700816 self._log_extra_msg('Processing Agent with Host Ids: %s and '
817 'queue_entry ids:%s' % (agent.host_ids,
818 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000819 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000820 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000821 have_reached_limit):
822 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700823 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000824 continue
showardd1195652009-12-08 22:21:02 +0000825 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700826 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000827 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700828 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000829 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700830 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000831 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700832 logging.info('%d running processes. %d added this cycle.',
833 _drone_manager.total_running_processes(),
834 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +0000835
836
showard29f7cd22009-04-29 21:16:24 +0000837 def _process_recurring_runs(self):
838 recurring_runs = models.RecurringRun.objects.filter(
839 start_date__lte=datetime.datetime.now())
840 for rrun in recurring_runs:
841 # Create job from template
842 job = rrun.job
843 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000844 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000845
846 host_objects = info['hosts']
847 one_time_hosts = info['one_time_hosts']
848 metahost_objects = info['meta_hosts']
849 dependencies = info['dependencies']
850 atomic_group = info['atomic_group']
851
852 for host in one_time_hosts or []:
853 this_host = models.Host.create_one_time_host(host.hostname)
854 host_objects.append(this_host)
855
856 try:
857 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000858 options=options,
showard29f7cd22009-04-29 21:16:24 +0000859 host_objects=host_objects,
860 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +0000861 atomic_group=atomic_group)
862
863 except Exception, ex:
864 logging.exception(ex)
865 #TODO send email
866
867 if rrun.loop_count == 1:
868 rrun.delete()
869 else:
870 if rrun.loop_count != 0: # if not infinite loop
871 # calculate new start_date
872 difference = datetime.timedelta(seconds=rrun.loop_period)
873 rrun.start_date = rrun.start_date + difference
874 rrun.loop_count -= 1
875 rrun.save()
876
877
Simran Basia858a232012-08-21 11:04:37 -0700878SiteDispatcher = utils.import_site_class(
879 __file__, 'autotest_lib.scheduler.site_monitor_db',
880 'SiteDispatcher', BaseDispatcher)
881
882class Dispatcher(SiteDispatcher):
883 pass
884
885
showard170873e2009-01-07 00:22:26 +0000886class PidfileRunMonitor(object):
887 """
888 Client must call either run() to start a new process or
889 attach_to_existing_process().
890 """
mbligh36768f02008-02-22 18:28:33 +0000891
showard170873e2009-01-07 00:22:26 +0000892 class _PidfileException(Exception):
893 """
894 Raised when there's some unexpected behavior with the pid file, but only
895 used internally (never allowed to escape this class).
896 """
mbligh36768f02008-02-22 18:28:33 +0000897
898
showard170873e2009-01-07 00:22:26 +0000899 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000900 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000901 self._start_time = None
902 self.pidfile_id = None
903 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000904
905
showard170873e2009-01-07 00:22:26 +0000906 def _add_nice_command(self, command, nice_level):
907 if not nice_level:
908 return command
909 return ['nice', '-n', str(nice_level)] + command
910
911
912 def _set_start_time(self):
913 self._start_time = time.time()
914
915
showard418785b2009-11-23 20:19:59 +0000916 def run(self, command, working_directory, num_processes, nice_level=None,
917 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +0000918 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +0000919 assert command is not None
920 if nice_level is not None:
921 command = ['nice', '-n', str(nice_level)] + command
922 self._set_start_time()
923 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +0000924 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +0000925 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +0000926 paired_with_pidfile=paired_with_pidfile, username=username,
927 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +0000928
929
showarded2afea2009-07-07 20:54:07 +0000930 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +0000931 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +0000932 num_processes=None):
showard170873e2009-01-07 00:22:26 +0000933 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +0000934 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000935 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +0000936 if num_processes is not None:
937 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +0000938
939
jadmanski0afbb632008-06-06 21:10:57 +0000940 def kill(self):
showard170873e2009-01-07 00:22:26 +0000941 if self.has_process():
942 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000943
mbligh36768f02008-02-22 18:28:33 +0000944
showard170873e2009-01-07 00:22:26 +0000945 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000946 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000947 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000948
949
showard170873e2009-01-07 00:22:26 +0000950 def get_process(self):
showard21baa452008-10-21 00:08:39 +0000951 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +0000952 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +0000953 return self._state.process
mblighbb421852008-03-11 22:36:16 +0000954
955
showard170873e2009-01-07 00:22:26 +0000956 def _read_pidfile(self, use_second_read=False):
957 assert self.pidfile_id is not None, (
958 'You must call run() or attach_to_existing_process()')
959 contents = _drone_manager.get_pidfile_contents(
960 self.pidfile_id, use_second_read=use_second_read)
961 if contents.is_invalid():
962 self._state = drone_manager.PidfileContents()
963 raise self._PidfileException(contents)
964 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +0000965
966
showard21baa452008-10-21 00:08:39 +0000967 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +0000968 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
969 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +0000970 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +0000971 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +0000972
973
974 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +0000975 if self.lost_process:
showard21baa452008-10-21 00:08:39 +0000976 return
mblighbb421852008-03-11 22:36:16 +0000977
showard21baa452008-10-21 00:08:39 +0000978 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +0000979
showard170873e2009-01-07 00:22:26 +0000980 if self._state.process is None:
981 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +0000982 return
mbligh90a549d2008-03-25 23:52:34 +0000983
showard21baa452008-10-21 00:08:39 +0000984 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000985 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +0000986 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +0000987 return
mbligh90a549d2008-03-25 23:52:34 +0000988
showard170873e2009-01-07 00:22:26 +0000989 # pid but no running process - maybe process *just* exited
990 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +0000991 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000992 # autoserv exited without writing an exit code
993 # to the pidfile
showard21baa452008-10-21 00:08:39 +0000994 self._handle_pidfile_error(
995 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +0000996
showard21baa452008-10-21 00:08:39 +0000997
998 def _get_pidfile_info(self):
999 """\
1000 After completion, self._state will contain:
1001 pid=None, exit_status=None if autoserv has not yet run
1002 pid!=None, exit_status=None if autoserv is running
1003 pid!=None, exit_status!=None if autoserv has completed
1004 """
1005 try:
1006 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001007 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001008 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001009
1010
showard170873e2009-01-07 00:22:26 +00001011 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001012 """\
1013 Called when no pidfile is found or no pid is in the pidfile.
1014 """
showard170873e2009-01-07 00:22:26 +00001015 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001016 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001017 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001018 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001019 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001020
1021
showard35162b02009-03-03 02:17:30 +00001022 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001023 """\
1024 Called when autoserv has exited without writing an exit status,
1025 or we've timed out waiting for autoserv to write a pid to the
1026 pidfile. In either case, we just return failure and the caller
1027 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001028
showard170873e2009-01-07 00:22:26 +00001029 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001030 """
1031 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001032 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001033 self._state.exit_status = 1
1034 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001035
1036
jadmanski0afbb632008-06-06 21:10:57 +00001037 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001038 self._get_pidfile_info()
1039 return self._state.exit_status
1040
1041
1042 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001043 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001044 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001045 if self._state.num_tests_failed is None:
1046 return -1
showard21baa452008-10-21 00:08:39 +00001047 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001048
1049
showardcdaeae82009-08-31 18:32:48 +00001050 def try_copy_results_on_drone(self, **kwargs):
1051 if self.has_process():
1052 # copy results logs into the normal place for job results
1053 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1054
1055
1056 def try_copy_to_results_repository(self, source, **kwargs):
1057 if self.has_process():
1058 _drone_manager.copy_to_results_repository(self.get_process(),
1059 source, **kwargs)
1060
1061
mbligh36768f02008-02-22 18:28:33 +00001062class Agent(object):
showard77182562009-06-10 00:16:05 +00001063 """
showard8cc058f2009-09-08 16:26:33 +00001064 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001065
1066 The following methods are required on all task objects:
1067 poll() - Called periodically to let the task check its status and
1068 update its internal state. If the task succeeded.
1069 is_done() - Returns True if the task is finished.
1070 abort() - Called when an abort has been requested. The task must
1071 set its aborted attribute to True if it actually aborted.
1072
1073 The following attributes are required on all task objects:
1074 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001075 success - bool, True if this task succeeded.
1076 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1077 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001078 """
1079
1080
showard418785b2009-11-23 20:19:59 +00001081 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001082 """
showard8cc058f2009-09-08 16:26:33 +00001083 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001084 """
showard8cc058f2009-09-08 16:26:33 +00001085 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001086
showard77182562009-06-10 00:16:05 +00001087 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001088 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001089
showard8cc058f2009-09-08 16:26:33 +00001090 self.queue_entry_ids = task.queue_entry_ids
1091 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001092
showard8cc058f2009-09-08 16:26:33 +00001093 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001094 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001095
1096
jadmanski0afbb632008-06-06 21:10:57 +00001097 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001098 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001099 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001100 self.task.poll()
1101 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001102 self.finished = True
showardec113162008-05-08 00:52:49 +00001103
1104
jadmanski0afbb632008-06-06 21:10:57 +00001105 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001106 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001107
1108
showardd3dc1992009-04-22 21:01:40 +00001109 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001110 if self.task:
1111 self.task.abort()
1112 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001113 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001114 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001115
showardd3dc1992009-04-22 21:01:40 +00001116
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001117class BaseAgentTask(object):
showardd1195652009-12-08 22:21:02 +00001118 class _NullMonitor(object):
1119 pidfile_id = None
1120
1121 def has_process(self):
1122 return True
1123
1124
1125 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001126 """
showardd1195652009-12-08 22:21:02 +00001127 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001128 """
jadmanski0afbb632008-06-06 21:10:57 +00001129 self.done = False
showardd1195652009-12-08 22:21:02 +00001130 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001131 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001132 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001133 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001134 self.queue_entry_ids = []
1135 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001136 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001137
1138
1139 def _set_ids(self, host=None, queue_entries=None):
1140 if queue_entries and queue_entries != [None]:
1141 self.host_ids = [entry.host.id for entry in queue_entries]
1142 self.queue_entry_ids = [entry.id for entry in queue_entries]
1143 else:
1144 assert host
1145 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001146
1147
jadmanski0afbb632008-06-06 21:10:57 +00001148 def poll(self):
showard08a36412009-05-05 01:01:13 +00001149 if not self.started:
1150 self.start()
showardd1195652009-12-08 22:21:02 +00001151 if not self.done:
1152 self.tick()
showard08a36412009-05-05 01:01:13 +00001153
1154
1155 def tick(self):
showardd1195652009-12-08 22:21:02 +00001156 assert self.monitor
1157 exit_code = self.monitor.exit_code()
1158 if exit_code is None:
1159 return
mbligh36768f02008-02-22 18:28:33 +00001160
showardd1195652009-12-08 22:21:02 +00001161 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001162 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001163
1164
jadmanski0afbb632008-06-06 21:10:57 +00001165 def is_done(self):
1166 return self.done
mbligh36768f02008-02-22 18:28:33 +00001167
1168
jadmanski0afbb632008-06-06 21:10:57 +00001169 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001170 if self.done:
showardd1195652009-12-08 22:21:02 +00001171 assert self.started
showard08a36412009-05-05 01:01:13 +00001172 return
showardd1195652009-12-08 22:21:02 +00001173 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001174 self.done = True
1175 self.success = success
1176 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001177
1178
jadmanski0afbb632008-06-06 21:10:57 +00001179 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001180 """
1181 To be overridden.
1182 """
showarded2afea2009-07-07 20:54:07 +00001183 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001184 self.register_necessary_pidfiles()
1185
1186
1187 def _log_file(self):
1188 if not self._log_file_name:
1189 return None
1190 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001191
mbligh36768f02008-02-22 18:28:33 +00001192
jadmanski0afbb632008-06-06 21:10:57 +00001193 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001194 log_file = self._log_file()
1195 if self.monitor and log_file:
1196 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001197
1198
jadmanski0afbb632008-06-06 21:10:57 +00001199 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001200 """
1201 To be overridden.
1202 """
jadmanski0afbb632008-06-06 21:10:57 +00001203 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001204 logging.info("%s finished with success=%s", type(self).__name__,
1205 self.success)
1206
mbligh36768f02008-02-22 18:28:33 +00001207
1208
jadmanski0afbb632008-06-06 21:10:57 +00001209 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001210 if not self.started:
1211 self.prolog()
1212 self.run()
1213
1214 self.started = True
1215
1216
1217 def abort(self):
1218 if self.monitor:
1219 self.monitor.kill()
1220 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001221 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001222 self.cleanup()
1223
1224
showarded2afea2009-07-07 20:54:07 +00001225 def _get_consistent_execution_path(self, execution_entries):
1226 first_execution_path = execution_entries[0].execution_path()
1227 for execution_entry in execution_entries[1:]:
1228 assert execution_entry.execution_path() == first_execution_path, (
1229 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1230 execution_entry,
1231 first_execution_path,
1232 execution_entries[0]))
1233 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001234
1235
showarded2afea2009-07-07 20:54:07 +00001236 def _copy_results(self, execution_entries, use_monitor=None):
1237 """
1238 @param execution_entries: list of objects with execution_path() method
1239 """
showard6d1c1432009-08-20 23:30:39 +00001240 if use_monitor is not None and not use_monitor.has_process():
1241 return
1242
showarded2afea2009-07-07 20:54:07 +00001243 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001244 if use_monitor is None:
1245 assert self.monitor
1246 use_monitor = self.monitor
1247 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001248 execution_path = self._get_consistent_execution_path(execution_entries)
1249 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001250 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001251
showarda1e74b32009-05-12 17:32:04 +00001252
1253 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001254 for queue_entry in queue_entries:
1255 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001256
1257
mbligh4608b002010-01-05 18:22:35 +00001258 def _archive_results(self, queue_entries):
1259 for queue_entry in queue_entries:
1260 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001261
1262
showardd1195652009-12-08 22:21:02 +00001263 def _command_line(self):
1264 """
1265 Return the command line to run. Must be overridden.
1266 """
1267 raise NotImplementedError
1268
1269
1270 @property
1271 def num_processes(self):
1272 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001273 Return the number of processes forked by this BaseAgentTask's process.
1274 It may only be approximate. To be overridden if necessary.
showardd1195652009-12-08 22:21:02 +00001275 """
1276 return 1
1277
1278
1279 def _paired_with_monitor(self):
1280 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001281 If this BaseAgentTask's process must run on the same machine as some
showardd1195652009-12-08 22:21:02 +00001282 previous process, this method should be overridden to return a
1283 PidfileRunMonitor for that process.
1284 """
1285 return self._NullMonitor()
1286
1287
1288 @property
1289 def owner_username(self):
1290 """
1291 Return login of user responsible for this task. May be None. Must be
1292 overridden.
1293 """
1294 raise NotImplementedError
1295
1296
1297 def _working_directory(self):
1298 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001299 Return the directory where this BaseAgentTask's process executes.
1300 Must be overridden.
showardd1195652009-12-08 22:21:02 +00001301 """
1302 raise NotImplementedError
1303
1304
1305 def _pidfile_name(self):
1306 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001307 Return the name of the pidfile this BaseAgentTask's process uses. To be
showardd1195652009-12-08 22:21:02 +00001308 overridden if necessary.
1309 """
jamesrenc44ae992010-02-19 00:12:54 +00001310 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001311
1312
1313 def _check_paired_results_exist(self):
1314 if not self._paired_with_monitor().has_process():
1315 email_manager.manager.enqueue_notify_email(
1316 'No paired results in task',
1317 'No paired results in task %s at %s'
1318 % (self, self._paired_with_monitor().pidfile_id))
1319 self.finished(False)
1320 return False
1321 return True
1322
1323
1324 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001325 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001326 self.monitor = PidfileRunMonitor()
1327
1328
1329 def run(self):
1330 if not self._check_paired_results_exist():
1331 return
1332
1333 self._create_monitor()
1334 self.monitor.run(
1335 self._command_line(), self._working_directory(),
1336 num_processes=self.num_processes,
1337 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1338 pidfile_name=self._pidfile_name(),
1339 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001340 username=self.owner_username,
1341 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1342
1343
1344 def get_drone_hostnames_allowed(self):
1345 if not models.DroneSet.drone_sets_enabled():
1346 return None
1347
1348 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1349 if not hqes:
1350 # Only special tasks could be missing host queue entries
1351 assert isinstance(self, SpecialAgentTask)
1352 return self._user_or_global_default_drone_set(
1353 self.task, self.task.requested_by)
1354
1355 job_ids = hqes.values_list('job', flat=True).distinct()
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001356 assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
jamesren76fcf192010-04-21 20:39:50 +00001357 "span multiple jobs")
1358
1359 job = models.Job.objects.get(id=job_ids[0])
1360 drone_set = job.drone_set
1361 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001362 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001363
1364 return drone_set.get_drone_hostnames()
1365
1366
1367 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1368 """
1369 Returns the user's default drone set, if present.
1370
1371 Otherwise, returns the global default drone set.
1372 """
1373 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1374 if not user:
1375 logging.warn('%s had no owner; using default drone set',
1376 obj_with_owner)
1377 return default_hostnames
1378 if not user.drone_set:
1379 logging.warn('User %s has no default drone set, using global '
1380 'default', user.login)
1381 return default_hostnames
1382 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001383
1384
1385 def register_necessary_pidfiles(self):
1386 pidfile_id = _drone_manager.get_pidfile_id_from(
1387 self._working_directory(), self._pidfile_name())
1388 _drone_manager.register_pidfile(pidfile_id)
1389
1390 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1391 if paired_pidfile_id:
1392 _drone_manager.register_pidfile(paired_pidfile_id)
1393
1394
1395 def recover(self):
1396 if not self._check_paired_results_exist():
1397 return
1398
1399 self._create_monitor()
1400 self.monitor.attach_to_existing_process(
1401 self._working_directory(), pidfile_name=self._pidfile_name(),
1402 num_processes=self.num_processes)
1403 if not self.monitor.has_process():
1404 # no process to recover; wait to be started normally
1405 self.monitor = None
1406 return
1407
1408 self.started = True
Aviv Keshet225bdfe2013-03-05 10:10:08 -08001409 logging.info('Recovering process %s for %s at %s',
1410 self.monitor.get_process(), type(self).__name__,
1411 self._working_directory())
mbligh36768f02008-02-22 18:28:33 +00001412
1413
mbligh4608b002010-01-05 18:22:35 +00001414 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1415 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001416 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001417 for entry in queue_entries:
1418 if entry.status not in allowed_hqe_statuses:
Dale Curtisaa513362011-03-01 17:27:44 -08001419 raise host_scheduler.SchedulerError(
1420 '%s attempting to start entry with invalid status %s: '
1421 '%s' % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001422 invalid_host_status = (
1423 allowed_host_statuses is not None
1424 and entry.host.status not in allowed_host_statuses)
1425 if invalid_host_status:
Dale Curtisaa513362011-03-01 17:27:44 -08001426 raise host_scheduler.SchedulerError(
1427 '%s attempting to start on queue entry with invalid '
1428 'host status %s: %s'
1429 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001430
1431
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001432SiteAgentTask = utils.import_site_class(
1433 __file__, 'autotest_lib.scheduler.site_monitor_db',
1434 'SiteAgentTask', BaseAgentTask)
1435
1436class AgentTask(SiteAgentTask):
1437 pass
1438
1439
showardd9205182009-04-27 20:09:55 +00001440class TaskWithJobKeyvals(object):
1441 """AgentTask mixin providing functionality to help with job keyval files."""
1442 _KEYVAL_FILE = 'keyval'
1443 def _format_keyval(self, key, value):
1444 return '%s=%s' % (key, value)
1445
1446
1447 def _keyval_path(self):
1448 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001449 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001450
1451
1452 def _write_keyval_after_job(self, field, value):
1453 assert self.monitor
1454 if not self.monitor.has_process():
1455 return
1456 _drone_manager.write_lines_to_file(
1457 self._keyval_path(), [self._format_keyval(field, value)],
1458 paired_with_process=self.monitor.get_process())
1459
1460
1461 def _job_queued_keyval(self, job):
1462 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1463
1464
1465 def _write_job_finished(self):
1466 self._write_keyval_after_job("job_finished", int(time.time()))
1467
1468
showarddb502762009-09-09 15:31:20 +00001469 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1470 keyval_contents = '\n'.join(self._format_keyval(key, value)
1471 for key, value in keyval_dict.iteritems())
1472 # always end with a newline to allow additional keyvals to be written
1473 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001474 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001475 keyval_contents,
1476 file_path=keyval_path)
1477
1478
1479 def _write_keyvals_before_job(self, keyval_dict):
1480 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1481
1482
1483 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001484 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001485 host.hostname)
1486 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001487 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001488 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1489 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1490
1491
showard8cc058f2009-09-08 16:26:33 +00001492class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001493 """
1494 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1495 """
1496
1497 TASK_TYPE = None
1498 host = None
1499 queue_entry = None
1500
showardd1195652009-12-08 22:21:02 +00001501 def __init__(self, task, extra_command_args):
1502 super(SpecialAgentTask, self).__init__()
1503
lmrb7c5d272010-04-16 06:34:04 +00001504 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001505
jamesrenc44ae992010-02-19 00:12:54 +00001506 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001507 self.queue_entry = None
1508 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001509 self.queue_entry = scheduler_models.HostQueueEntry(
1510 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001511
showarded2afea2009-07-07 20:54:07 +00001512 self.task = task
1513 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001514
1515
showard8cc058f2009-09-08 16:26:33 +00001516 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001517 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1518
1519
1520 def _command_line(self):
1521 return _autoserv_command_line(self.host.hostname,
1522 self._extra_command_args,
1523 queue_entry=self.queue_entry)
1524
1525
1526 def _working_directory(self):
1527 return self.task.execution_path()
1528
1529
1530 @property
1531 def owner_username(self):
1532 if self.task.requested_by:
1533 return self.task.requested_by.login
1534 return None
showard8cc058f2009-09-08 16:26:33 +00001535
1536
showarded2afea2009-07-07 20:54:07 +00001537 def prolog(self):
1538 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001539 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001540 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001541
1542
showardde634ee2009-01-30 01:44:24 +00001543 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001544 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001545
showard2fe3f1d2009-07-06 20:19:11 +00001546 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001547 return # don't fail metahost entries, they'll be reassigned
1548
showard2fe3f1d2009-07-06 20:19:11 +00001549 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001550 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001551 return # entry has been aborted
1552
showard2fe3f1d2009-07-06 20:19:11 +00001553 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001554 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001555 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001556 self._write_keyval_after_job(queued_key, queued_time)
1557 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001558
showard8cc058f2009-09-08 16:26:33 +00001559 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001560 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001561 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001562 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001563
showard8cc058f2009-09-08 16:26:33 +00001564 pidfile_id = _drone_manager.get_pidfile_id_from(
1565 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001566 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001567 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001568
1569 if self.queue_entry.job.parse_failed_repair:
1570 self._parse_results([self.queue_entry])
1571 else:
1572 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001573
1574
1575 def cleanup(self):
1576 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001577
1578 # We will consider an aborted task to be "Failed"
1579 self.task.finish(bool(self.success))
1580
showardf85a0b72009-10-07 20:48:45 +00001581 if self.monitor:
1582 if self.monitor.has_process():
1583 self._copy_results([self.task])
1584 if self.monitor.pidfile_id is not None:
1585 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001586
1587
1588class RepairTask(SpecialAgentTask):
1589 TASK_TYPE = models.SpecialTask.Task.REPAIR
1590
1591
showardd1195652009-12-08 22:21:02 +00001592 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001593 """\
1594 queue_entry: queue entry to mark failed if this repair fails.
1595 """
1596 protection = host_protections.Protection.get_string(
1597 task.host.protection)
1598 # normalize the protection name
1599 protection = host_protections.Protection.get_attr_name(protection)
1600
1601 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001602 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001603
1604 # *don't* include the queue entry in IDs -- if the queue entry is
1605 # aborted, we want to leave the repair task running
1606 self._set_ids(host=self.host)
1607
1608
1609 def prolog(self):
1610 super(RepairTask, self).prolog()
1611 logging.info("repair_task starting")
1612 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001613
1614
jadmanski0afbb632008-06-06 21:10:57 +00001615 def epilog(self):
1616 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001617
jadmanski0afbb632008-06-06 21:10:57 +00001618 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001619 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001620 else:
showard8cc058f2009-09-08 16:26:33 +00001621 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001622 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001623 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001624
1625
showarded2afea2009-07-07 20:54:07 +00001626class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001627 def _copy_to_results_repository(self):
1628 if not self.queue_entry or self.queue_entry.meta_host:
1629 return
1630
1631 self.queue_entry.set_execution_subdir()
1632 log_name = os.path.basename(self.task.execution_path())
1633 source = os.path.join(self.task.execution_path(), 'debug',
1634 'autoserv.DEBUG')
1635 destination = os.path.join(
1636 self.queue_entry.execution_path(), log_name)
1637
1638 self.monitor.try_copy_to_results_repository(
1639 source, destination_path=destination)
1640
1641
showard170873e2009-01-07 00:22:26 +00001642 def epilog(self):
1643 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001644
showard775300b2009-09-09 15:30:50 +00001645 if self.success:
1646 return
showard8fe93b52008-11-18 17:53:22 +00001647
showard775300b2009-09-09 15:30:50 +00001648 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001649
showard775300b2009-09-09 15:30:50 +00001650 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001651 # effectively ignore failure for these hosts
1652 self.success = True
showard775300b2009-09-09 15:30:50 +00001653 return
1654
1655 if self.queue_entry:
1656 self.queue_entry.requeue()
1657
1658 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001659 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001660 queue_entry__id=self.queue_entry.id):
1661 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1662 self._fail_queue_entry()
1663 return
1664
showard9bb960b2009-11-19 01:02:11 +00001665 queue_entry = models.HostQueueEntry.objects.get(
1666 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001667 else:
1668 queue_entry = None
1669
1670 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001671 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001672 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001673 queue_entry=queue_entry,
1674 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001675
showard8fe93b52008-11-18 17:53:22 +00001676
1677class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001678 TASK_TYPE = models.SpecialTask.Task.VERIFY
1679
1680
showardd1195652009-12-08 22:21:02 +00001681 def __init__(self, task):
1682 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001683 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001684
1685
jadmanski0afbb632008-06-06 21:10:57 +00001686 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001687 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001688
showardb18134f2009-03-20 20:52:18 +00001689 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001690 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001691 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1692 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001693
jamesren42318f72010-05-10 23:40:59 +00001694 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00001695 # and there's no need to keep records of other requests.
1696 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001697 host__id=self.host.id,
1698 task=models.SpecialTask.Task.VERIFY,
jamesren42318f72010-05-10 23:40:59 +00001699 is_active=False, is_complete=False, queue_entry=None)
showarded2afea2009-07-07 20:54:07 +00001700 queued_verifies = queued_verifies.exclude(id=self.task.id)
1701 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001702
mbligh36768f02008-02-22 18:28:33 +00001703
jadmanski0afbb632008-06-06 21:10:57 +00001704 def epilog(self):
1705 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001706 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001707 if self.queue_entry:
1708 self.queue_entry.on_pending()
1709 else:
1710 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001711
1712
mbligh4608b002010-01-05 18:22:35 +00001713class CleanupTask(PreJobTask):
1714 # note this can also run post-job, but when it does, it's running standalone
1715 # against the host (not related to the job), so it's not considered a
1716 # PostJobTask
1717
1718 TASK_TYPE = models.SpecialTask.Task.CLEANUP
1719
1720
1721 def __init__(self, task, recover_run_monitor=None):
1722 super(CleanupTask, self).__init__(task, ['--cleanup'])
1723 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1724
1725
1726 def prolog(self):
1727 super(CleanupTask, self).prolog()
1728 logging.info("starting cleanup task for host: %s", self.host.hostname)
1729 self.host.set_status(models.Host.Status.CLEANING)
1730 if self.queue_entry:
1731 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1732
1733
1734 def _finish_epilog(self):
1735 if not self.queue_entry or not self.success:
1736 return
1737
1738 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
1739 should_run_verify = (
1740 self.queue_entry.job.run_verify
1741 and self.host.protection != do_not_verify_protection)
1742 if should_run_verify:
1743 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1744 models.SpecialTask.objects.create(
1745 host=models.Host.objects.get(id=self.host.id),
1746 queue_entry=entry,
1747 task=models.SpecialTask.Task.VERIFY)
1748 else:
1749 self.queue_entry.on_pending()
1750
1751
1752 def epilog(self):
1753 super(CleanupTask, self).epilog()
1754
1755 if self.success:
1756 self.host.update_field('dirty', 0)
1757 self.host.set_status(models.Host.Status.READY)
1758
1759 self._finish_epilog()
1760
1761
showarda9545c02009-12-18 22:44:26 +00001762class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
1763 """
1764 Common functionality for QueueTask and HostlessQueueTask
1765 """
1766 def __init__(self, queue_entries):
1767 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001768 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001769 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001770
1771
showard73ec0442009-02-07 02:05:20 +00001772 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001773 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001774
1775
jamesrenc44ae992010-02-19 00:12:54 +00001776 def _write_control_file(self, execution_path):
1777 control_path = _drone_manager.attach_file_to_execution(
1778 execution_path, self.job.control_file)
1779 return control_path
1780
1781
showardd1195652009-12-08 22:21:02 +00001782 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001783 execution_path = self.queue_entries[0].execution_path()
1784 control_path = self._write_control_file(execution_path)
1785 hostnames = ','.join(entry.host.hostname
1786 for entry in self.queue_entries
1787 if not entry.is_hostless())
1788
1789 execution_tag = self.queue_entries[0].execution_tag()
1790 params = _autoserv_command_line(
1791 hostnames,
1792 ['-P', execution_tag, '-n',
1793 _drone_manager.absolute_path(control_path)],
1794 job=self.job, verbose=False)
1795
1796 if not self.job.is_server_job():
1797 params.append('-c')
1798
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001799 if self.job.is_image_update_job():
1800 params += ['--image', self.job.update_image_path]
1801
jamesrenc44ae992010-02-19 00:12:54 +00001802 return params
showardd1195652009-12-08 22:21:02 +00001803
1804
1805 @property
1806 def num_processes(self):
1807 return len(self.queue_entries)
1808
1809
1810 @property
1811 def owner_username(self):
1812 return self.job.owner
1813
1814
1815 def _working_directory(self):
1816 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001817
1818
jadmanski0afbb632008-06-06 21:10:57 +00001819 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001820 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001821 keyval_dict = self.job.keyval_dict()
1822 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001823 group_name = self.queue_entries[0].get_group_name()
1824 if group_name:
1825 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001826 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001827 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001828 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001829 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001830
1831
showard35162b02009-03-03 02:17:30 +00001832 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001833 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001834 _drone_manager.write_lines_to_file(error_file_path,
1835 [_LOST_PROCESS_ERROR])
1836
1837
showardd3dc1992009-04-22 21:01:40 +00001838 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001839 if not self.monitor:
1840 return
1841
showardd9205182009-04-27 20:09:55 +00001842 self._write_job_finished()
1843
showard35162b02009-03-03 02:17:30 +00001844 if self.monitor.lost_process:
1845 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001846
jadmanskif7fa2cc2008-10-01 14:13:23 +00001847
showardcbd74612008-11-19 21:42:02 +00001848 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001849 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001850 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001851 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001852 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001853
1854
jadmanskif7fa2cc2008-10-01 14:13:23 +00001855 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001856 if not self.monitor or not self.monitor.has_process():
1857 return
1858
jadmanskif7fa2cc2008-10-01 14:13:23 +00001859 # build up sets of all the aborted_by and aborted_on values
1860 aborted_by, aborted_on = set(), set()
1861 for queue_entry in self.queue_entries:
1862 if queue_entry.aborted_by:
1863 aborted_by.add(queue_entry.aborted_by)
1864 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1865 aborted_on.add(t)
1866
1867 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001868 # TODO(showard): this conditional is now obsolete, we just need to leave
1869 # it in temporarily for backwards compatibility over upgrades. delete
1870 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001871 assert len(aborted_by) <= 1
1872 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001873 aborted_by_value = aborted_by.pop()
1874 aborted_on_value = max(aborted_on)
1875 else:
1876 aborted_by_value = 'autotest_system'
1877 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001878
showarda0382352009-02-11 23:36:43 +00001879 self._write_keyval_after_job("aborted_by", aborted_by_value)
1880 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001881
showardcbd74612008-11-19 21:42:02 +00001882 aborted_on_string = str(datetime.datetime.fromtimestamp(
1883 aborted_on_value))
1884 self._write_status_comment('Job aborted by %s on %s' %
1885 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001886
1887
jadmanski0afbb632008-06-06 21:10:57 +00001888 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001889 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001890 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001891 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001892
1893
jadmanski0afbb632008-06-06 21:10:57 +00001894 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001895 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001896 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001897
1898
1899class QueueTask(AbstractQueueTask):
1900 def __init__(self, queue_entries):
1901 super(QueueTask, self).__init__(queue_entries)
1902 self._set_ids(queue_entries=queue_entries)
1903
1904
1905 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001906 self._check_queue_entry_statuses(
1907 self.queue_entries,
1908 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1909 models.HostQueueEntry.Status.RUNNING),
1910 allowed_host_statuses=(models.Host.Status.PENDING,
1911 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001912
1913 super(QueueTask, self).prolog()
1914
1915 for queue_entry in self.queue_entries:
1916 self._write_host_keyvals(queue_entry.host)
1917 queue_entry.host.set_status(models.Host.Status.RUNNING)
1918 queue_entry.host.update_field('dirty', 1)
1919 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1920 # TODO(gps): Remove this if nothing needs it anymore.
1921 # A potential user is: tko/parser
1922 self.job.write_to_machines_file(self.queue_entries[0])
1923
1924
1925 def _finish_task(self):
1926 super(QueueTask, self)._finish_task()
1927
1928 for queue_entry in self.queue_entries:
1929 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001930 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001931
1932
mbligh4608b002010-01-05 18:22:35 +00001933class HostlessQueueTask(AbstractQueueTask):
1934 def __init__(self, queue_entry):
1935 super(HostlessQueueTask, self).__init__([queue_entry])
1936 self.queue_entry_ids = [queue_entry.id]
1937
1938
1939 def prolog(self):
1940 self.queue_entries[0].update_field('execution_subdir', 'hostless')
1941 super(HostlessQueueTask, self).prolog()
1942
1943
mbligh4608b002010-01-05 18:22:35 +00001944 def _finish_task(self):
1945 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00001946 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001947
1948
showardd3dc1992009-04-22 21:01:40 +00001949class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00001950 def __init__(self, queue_entries, log_file_name):
1951 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00001952
showardd1195652009-12-08 22:21:02 +00001953 self.queue_entries = queue_entries
1954
showardd3dc1992009-04-22 21:01:40 +00001955 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00001956 self._autoserv_monitor.attach_to_existing_process(
1957 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00001958
showardd1195652009-12-08 22:21:02 +00001959
1960 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00001961 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00001962 return 'true'
1963 return self._generate_command(
1964 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00001965
1966
1967 def _generate_command(self, results_dir):
1968 raise NotImplementedError('Subclasses must override this')
1969
1970
showardd1195652009-12-08 22:21:02 +00001971 @property
1972 def owner_username(self):
1973 return self.queue_entries[0].job.owner
1974
1975
1976 def _working_directory(self):
1977 return self._get_consistent_execution_path(self.queue_entries)
1978
1979
1980 def _paired_with_monitor(self):
1981 return self._autoserv_monitor
1982
1983
showardd3dc1992009-04-22 21:01:40 +00001984 def _job_was_aborted(self):
1985 was_aborted = None
showardd1195652009-12-08 22:21:02 +00001986 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00001987 queue_entry.update_from_database()
1988 if was_aborted is None: # first queue entry
1989 was_aborted = bool(queue_entry.aborted)
1990 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00001991 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
1992 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00001993 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00001994 'Inconsistent abort state',
1995 'Queue entries have inconsistent abort state:\n' +
1996 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00001997 # don't crash here, just assume true
1998 return True
1999 return was_aborted
2000
2001
showardd1195652009-12-08 22:21:02 +00002002 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002003 if self._job_was_aborted():
2004 return models.HostQueueEntry.Status.ABORTED
2005
2006 # we'll use a PidfileRunMonitor to read the autoserv exit status
2007 if self._autoserv_monitor.exit_code() == 0:
2008 return models.HostQueueEntry.Status.COMPLETED
2009 return models.HostQueueEntry.Status.FAILED
2010
2011
showardd3dc1992009-04-22 21:01:40 +00002012 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002013 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002014 queue_entry.set_status(status)
2015
2016
2017 def abort(self):
2018 # override AgentTask.abort() to avoid killing the process and ending
2019 # the task. post-job tasks continue when the job is aborted.
2020 pass
2021
2022
mbligh4608b002010-01-05 18:22:35 +00002023 def _pidfile_label(self):
2024 # '.autoserv_execute' -> 'autoserv'
2025 return self._pidfile_name()[1:-len('_execute')]
2026
2027
showard9bb960b2009-11-19 01:02:11 +00002028class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002029 """
2030 Task responsible for
2031 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2032 * copying logs to the results repository
2033 * spawning CleanupTasks for hosts, if necessary
2034 * spawning a FinalReparseTask for the job
2035 """
showardd1195652009-12-08 22:21:02 +00002036 def __init__(self, queue_entries, recover_run_monitor=None):
2037 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002038 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002039 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002040 self._set_ids(queue_entries=queue_entries)
2041
2042
2043 def _generate_command(self, results_dir):
2044 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002045 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002046 return [_autoserv_path , '-p',
2047 '--pidfile-label=%s' % self._pidfile_label(),
2048 '--use-existing-results', '--collect-crashinfo',
2049 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002050
2051
showardd1195652009-12-08 22:21:02 +00002052 @property
2053 def num_processes(self):
2054 return len(self.queue_entries)
2055
2056
2057 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002058 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002059
2060
showardd3dc1992009-04-22 21:01:40 +00002061 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002062 self._check_queue_entry_statuses(
2063 self.queue_entries,
2064 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2065 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002066
showardd3dc1992009-04-22 21:01:40 +00002067 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002068
2069
showardd3dc1992009-04-22 21:01:40 +00002070 def epilog(self):
2071 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002072 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002073 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002074
showard9bb960b2009-11-19 01:02:11 +00002075
2076 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002077 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002078 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002079 models.HostQueueEntry.Status.COMPLETED)
2080 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2081 else:
2082 final_success = False
2083 num_tests_failed = 0
2084
showard9bb960b2009-11-19 01:02:11 +00002085 reboot_after = self._job.reboot_after
2086 do_reboot = (
2087 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002088 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002089 or reboot_after == model_attributes.RebootAfter.ALWAYS
2090 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
showard9bb960b2009-11-19 01:02:11 +00002091 and final_success and num_tests_failed == 0))
2092
showardd1195652009-12-08 22:21:02 +00002093 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002094 if do_reboot:
2095 # don't pass the queue entry to the CleanupTask. if the cleanup
2096 # fails, the job doesn't care -- it's over.
2097 models.SpecialTask.objects.create(
2098 host=models.Host.objects.get(id=queue_entry.host.id),
2099 task=models.SpecialTask.Task.CLEANUP,
2100 requested_by=self._job.owner_model())
2101 else:
2102 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002103
2104
showard0bbfc212009-04-29 21:06:13 +00002105 def run(self):
showard597bfd32009-05-08 18:22:50 +00002106 autoserv_exit_code = self._autoserv_monitor.exit_code()
2107 # only run if Autoserv exited due to some signal. if we have no exit
2108 # code, assume something bad (and signal-like) happened.
2109 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002110 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002111 else:
2112 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002113
2114
mbligh4608b002010-01-05 18:22:35 +00002115class SelfThrottledPostJobTask(PostJobTask):
2116 """
2117 Special AgentTask subclass that maintains its own global process limit.
2118 """
2119 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002120
2121
mbligh4608b002010-01-05 18:22:35 +00002122 @classmethod
2123 def _increment_running_processes(cls):
2124 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002125
mblighd5c95802008-03-05 00:33:46 +00002126
mbligh4608b002010-01-05 18:22:35 +00002127 @classmethod
2128 def _decrement_running_processes(cls):
2129 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002130
2131
mbligh4608b002010-01-05 18:22:35 +00002132 @classmethod
2133 def _max_processes(cls):
2134 raise NotImplementedError
2135
2136
2137 @classmethod
2138 def _can_run_new_process(cls):
2139 return cls._num_running_processes < cls._max_processes()
2140
2141
2142 def _process_started(self):
2143 return bool(self.monitor)
2144
2145
2146 def tick(self):
2147 # override tick to keep trying to start until the process count goes
2148 # down and we can, at which point we revert to default behavior
2149 if self._process_started():
2150 super(SelfThrottledPostJobTask, self).tick()
2151 else:
2152 self._try_starting_process()
2153
2154
2155 def run(self):
2156 # override run() to not actually run unless we can
2157 self._try_starting_process()
2158
2159
2160 def _try_starting_process(self):
2161 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002162 return
2163
mbligh4608b002010-01-05 18:22:35 +00002164 # actually run the command
2165 super(SelfThrottledPostJobTask, self).run()
jamesren25663562010-04-27 18:00:55 +00002166 if self._process_started():
2167 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002168
mblighd5c95802008-03-05 00:33:46 +00002169
mbligh4608b002010-01-05 18:22:35 +00002170 def finished(self, success):
2171 super(SelfThrottledPostJobTask, self).finished(success)
2172 if self._process_started():
2173 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002174
showard21baa452008-10-21 00:08:39 +00002175
mbligh4608b002010-01-05 18:22:35 +00002176class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002177 def __init__(self, queue_entries):
2178 super(FinalReparseTask, self).__init__(queue_entries,
2179 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002180 # don't use _set_ids, since we don't want to set the host_ids
2181 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002182
2183
2184 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002185 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002186 results_dir]
2187
2188
2189 @property
2190 def num_processes(self):
2191 return 0 # don't include parser processes in accounting
2192
2193
2194 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002195 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002196
2197
showard97aed502008-11-04 02:01:24 +00002198 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002199 def _max_processes(cls):
2200 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002201
2202
2203 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002204 self._check_queue_entry_statuses(
2205 self.queue_entries,
2206 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002207
showard97aed502008-11-04 02:01:24 +00002208 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002209
2210
2211 def epilog(self):
2212 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002213 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002214
2215
mbligh4608b002010-01-05 18:22:35 +00002216class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002217 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2218
mbligh4608b002010-01-05 18:22:35 +00002219 def __init__(self, queue_entries):
2220 super(ArchiveResultsTask, self).__init__(queue_entries,
2221 log_file_name='.archiving.log')
2222 # don't use _set_ids, since we don't want to set the host_ids
2223 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002224
2225
mbligh4608b002010-01-05 18:22:35 +00002226 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002227 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002228
2229
mbligh4608b002010-01-05 18:22:35 +00002230 def _generate_command(self, results_dir):
2231 return [_autoserv_path , '-p',
2232 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002233 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002234 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2235 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002236
2237
mbligh4608b002010-01-05 18:22:35 +00002238 @classmethod
2239 def _max_processes(cls):
2240 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002241
2242
2243 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002244 self._check_queue_entry_statuses(
2245 self.queue_entries,
2246 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2247
2248 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002249
2250
mbligh4608b002010-01-05 18:22:35 +00002251 def epilog(self):
2252 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002253 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002254 failed_file = os.path.join(self._working_directory(),
2255 self._ARCHIVING_FAILED_FILE)
2256 paired_process = self._paired_with_monitor().get_process()
2257 _drone_manager.write_lines_to_file(
2258 failed_file, ['Archiving failed with exit code %s'
2259 % self.monitor.exit_code()],
2260 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002261 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002262
2263
mbligh36768f02008-02-22 18:28:33 +00002264if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002265 main()