blob: 9d1eced8dfa5e2bb070ee89adf2c7064558f8df3 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
mbligh36768f02008-02-22 18:28:33 +00008
showard402934a2009-12-21 22:20:47 +00009import common
Aviv Keshet225bdfe2013-03-05 10:10:08 -080010import datetime, optparse, os, signal
11import sys, time, traceback, urllib
12import logging, gc
showard402934a2009-12-21 22:20:47 +000013
showard043c62a2009-06-10 19:48:57 +000014from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000015from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000016
17import django.db
18
showard136e6dc2009-06-10 19:38:49 +000019from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000020from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000021from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000022from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
jamesrendd855242010-03-02 22:23:44 +000023from autotest_lib.frontend.afe import model_attributes
showard170873e2009-01-07 00:22:26 +000024from autotest_lib.scheduler import drone_manager, drones, email_manager
Dale Curtisaa513362011-03-01 17:27:44 -080025from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000026from autotest_lib.scheduler import status_server, scheduler_config
jamesrenc44ae992010-02-19 00:12:54 +000027from autotest_lib.scheduler import scheduler_models
showard549afad2009-08-20 23:33:36 +000028BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
29PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000030
mbligh36768f02008-02-22 18:28:33 +000031RESULTS_DIR = '.'
32AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000033DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000034AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
35
36if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000037 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000038AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
39AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
40
41if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000042 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000043
showard35162b02009-03-03 02:17:30 +000044# error message to leave in results dir when an autoserv process disappears
45# mysteriously
46_LOST_PROCESS_ERROR = """\
47Autoserv failed abnormally during execution for this job, probably due to a
48system error on the Autotest server. Full results may not be available. Sorry.
49"""
50
mbligh6f8bab42008-02-29 22:45:14 +000051_db = None
mbligh36768f02008-02-22 18:28:33 +000052_shutdown = False
showard170873e2009-01-07 00:22:26 +000053_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000054_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000055_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000056
Eric Lie0493a42010-11-15 13:05:43 -080057def _parser_path_default(install_dir):
58 return os.path.join(install_dir, 'tko', 'parse')
59_parser_path_func = utils.import_site_function(
60 __file__, 'autotest_lib.scheduler.site_monitor_db',
61 'parser_path', _parser_path_default)
62_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
63
mbligh36768f02008-02-22 18:28:33 +000064
showardec6a3b92009-09-25 20:29:13 +000065def _get_pidfile_timeout_secs():
66 """@returns How long to wait for autoserv to write pidfile."""
67 pidfile_timeout_mins = global_config.global_config.get_config_value(
68 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
69 return pidfile_timeout_mins * 60
70
71
mbligh83c1e9e2009-05-01 23:10:41 +000072def _site_init_monitor_db_dummy():
73 return {}
74
75
jamesren76fcf192010-04-21 20:39:50 +000076def _verify_default_drone_set_exists():
77 if (models.DroneSet.drone_sets_enabled() and
78 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080079 raise host_scheduler.SchedulerError(
80 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000081
82
83def _sanity_check():
84 """Make sure the configs are consistent before starting the scheduler"""
85 _verify_default_drone_set_exists()
86
87
mbligh36768f02008-02-22 18:28:33 +000088def main():
showard27f33872009-04-07 18:20:53 +000089 try:
showard549afad2009-08-20 23:33:36 +000090 try:
91 main_without_exception_handling()
92 except SystemExit:
93 raise
94 except:
95 logging.exception('Exception escaping in monitor_db')
96 raise
97 finally:
98 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000099
100
101def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000102 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000103
showard136e6dc2009-06-10 19:38:49 +0000104 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000105 parser = optparse.OptionParser(usage)
106 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
107 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000108 parser.add_option('--test', help='Indicate that scheduler is under ' +
109 'test and should use dummy autoserv and no parsing',
110 action='store_true')
111 (options, args) = parser.parse_args()
112 if len(args) != 1:
113 parser.print_usage()
114 return
mbligh36768f02008-02-22 18:28:33 +0000115
showard5613c662009-06-08 23:30:33 +0000116 scheduler_enabled = global_config.global_config.get_config_value(
117 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
118
119 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800120 logging.error("Scheduler not enabled, set enable_scheduler to true in "
121 "the global_config's SCHEDULER section to enable it. "
122 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000123 sys.exit(1)
124
jadmanski0afbb632008-06-06 21:10:57 +0000125 global RESULTS_DIR
126 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000127
mbligh83c1e9e2009-05-01 23:10:41 +0000128 site_init = utils.import_site_function(__file__,
129 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
130 _site_init_monitor_db_dummy)
131 site_init()
132
showardcca334f2009-03-12 20:38:34 +0000133 # Change the cwd while running to avoid issues incase we were launched from
134 # somewhere odd (such as a random NFS home directory of the person running
135 # sudo to launch us as the appropriate user).
136 os.chdir(RESULTS_DIR)
137
jamesrenc7d387e2010-08-10 21:48:30 +0000138 # This is helpful for debugging why stuff a scheduler launches is
139 # misbehaving.
140 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000141
jadmanski0afbb632008-06-06 21:10:57 +0000142 if options.test:
143 global _autoserv_path
144 _autoserv_path = 'autoserv_dummy'
145 global _testing_mode
146 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000147
jamesrenc44ae992010-02-19 00:12:54 +0000148 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000149 server.start()
150
jadmanski0afbb632008-06-06 21:10:57 +0000151 try:
jamesrenc44ae992010-02-19 00:12:54 +0000152 initialize()
showardc5afc462009-01-13 00:09:39 +0000153 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000154 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000155
Eric Lia82dc352011-02-23 13:15:52 -0800156 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000157 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000158 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000159 except:
showard170873e2009-01-07 00:22:26 +0000160 email_manager.manager.log_stacktrace(
161 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000162
showard170873e2009-01-07 00:22:26 +0000163 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000164 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000165 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000166 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000167
168
showard136e6dc2009-06-10 19:38:49 +0000169def setup_logging():
170 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
171 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
172 logging_manager.configure_logging(
173 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
174 logfile_name=log_name)
175
176
mbligh36768f02008-02-22 18:28:33 +0000177def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000178 global _shutdown
179 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000180 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000181
182
jamesrenc44ae992010-02-19 00:12:54 +0000183def initialize():
showardb18134f2009-03-20 20:52:18 +0000184 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
185 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000186
showard8de37132009-08-31 18:33:08 +0000187 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000188 logging.critical("monitor_db already running, aborting!")
189 sys.exit(1)
190 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000191
showardb1e51872008-10-07 11:08:18 +0000192 if _testing_mode:
193 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000194 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000195
jadmanski0afbb632008-06-06 21:10:57 +0000196 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
197 global _db
showard170873e2009-01-07 00:22:26 +0000198 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000199 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000200
showardfa8629c2008-11-04 16:51:23 +0000201 # ensure Django connection is in autocommit
202 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000203 # bypass the readonly connection
204 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000205
showardb18134f2009-03-20 20:52:18 +0000206 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000207 signal.signal(signal.SIGINT, handle_sigint)
208
jamesrenc44ae992010-02-19 00:12:54 +0000209 initialize_globals()
210 scheduler_models.initialize()
211
showardd1ee1dd2009-01-07 21:33:08 +0000212 drones = global_config.global_config.get_config_value(
213 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
214 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000215 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000216 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000217 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
218
showardb18134f2009-03-20 20:52:18 +0000219 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000220
221
jamesrenc44ae992010-02-19 00:12:54 +0000222def initialize_globals():
223 global _drone_manager
224 _drone_manager = drone_manager.instance()
225
226
showarded2afea2009-07-07 20:54:07 +0000227def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
228 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000229 """
230 @returns The autoserv command line as a list of executable + parameters.
231
232 @param machines - string - A machine or comma separated list of machines
233 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000234 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet3664d072013-03-04 16:22:55 -0800235 @param job - Job object - If supplied, -u owner, -l name, and --test-retry
236 parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000237 @param queue_entry - A HostQueueEntry object - If supplied and no Job
238 object was supplied, this will be used to lookup the Job object.
239 """
showarda9545c02009-12-18 22:44:26 +0000240 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000241 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000242 if machines:
243 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000244 if job or queue_entry:
245 if not job:
246 job = queue_entry.job
247 autoserv_argv += ['-u', job.owner, '-l', job.name]
Aviv Keshet3664d072013-03-04 16:22:55 -0800248 if job.test_retry:
249 autoserv_argv += ['--test-retry='+str(job.test_retry)]
showarde9c69362009-06-30 01:58:03 +0000250 if verbose:
251 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000252 return autoserv_argv + extra_args
253
254
Simran Basia858a232012-08-21 11:04:37 -0700255class BaseDispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000256 def __init__(self):
257 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000258 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800259 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000260 user_cleanup_time = scheduler_config.config.clean_interval
261 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
262 _db, user_cleanup_time)
263 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000264 self._host_agents = {}
265 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000266 self._tick_count = 0
267 self._last_garbage_stats_time = time.time()
268 self._seconds_between_garbage_stats = 60 * (
269 global_config.global_config.get_config_value(
270 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700271 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700272 self._tick_debug = global_config.global_config.get_config_value(
273 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
274 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700275 self._extra_debugging = global_config.global_config.get_config_value(
276 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
277 default=False)
mbligh36768f02008-02-22 18:28:33 +0000278
mbligh36768f02008-02-22 18:28:33 +0000279
showard915958d2009-04-22 21:00:58 +0000280 def initialize(self, recover_hosts=True):
281 self._periodic_cleanup.initialize()
282 self._24hr_upkeep.initialize()
283
jadmanski0afbb632008-06-06 21:10:57 +0000284 # always recover processes
285 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000286
jadmanski0afbb632008-06-06 21:10:57 +0000287 if recover_hosts:
288 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000289
jamesrenc44ae992010-02-19 00:12:54 +0000290 self._host_scheduler.recovery_on_startup()
291
mbligh36768f02008-02-22 18:28:33 +0000292
Simran Basi0ec94dd2012-08-28 09:50:10 -0700293 def _log_tick_msg(self, msg):
294 if self._tick_debug:
295 logging.debug(msg)
296
297
Simran Basidef92872012-09-20 13:34:34 -0700298 def _log_extra_msg(self, msg):
299 if self._extra_debugging:
300 logging.debug(msg)
301
302
jadmanski0afbb632008-06-06 21:10:57 +0000303 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700304 """
305 This is an altered version of tick() where we keep track of when each
306 major step begins so we can try to figure out where we are using most
307 of the tick time.
308 """
Simran Basi3f6717d2012-09-13 15:21:22 -0700309 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000310 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700311 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000312 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700313 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000314 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700315 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000316 self._find_aborting()
Simran Basi3f6717d2012-09-13 15:21:22 -0700317 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000318 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700319 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000320 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700321 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000322 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700323 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000324 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700325 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000326 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700327 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000328 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700329 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000330 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700331 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000332 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700333 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700334 'email_manager.manager.send_queued_emails().')
showard170873e2009-01-07 00:22:26 +0000335 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700336 self._log_tick_msg('Calling django.db.reset_queries().')
showard402934a2009-12-21 22:20:47 +0000337 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000338 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000339
showard97aed502008-11-04 02:01:24 +0000340
mblighf3294cc2009-04-08 21:17:38 +0000341 def _run_cleanup(self):
342 self._periodic_cleanup.run_cleanup_maybe()
343 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000344
mbligh36768f02008-02-22 18:28:33 +0000345
showardf13a9e22009-12-18 22:54:09 +0000346 def _garbage_collection(self):
347 threshold_time = time.time() - self._seconds_between_garbage_stats
348 if threshold_time < self._last_garbage_stats_time:
349 # Don't generate these reports very often.
350 return
351
352 self._last_garbage_stats_time = time.time()
353 # Force a full level 0 collection (because we can, it doesn't hurt
354 # at this interval).
355 gc.collect()
356 logging.info('Logging garbage collector stats on tick %d.',
357 self._tick_count)
358 gc_stats._log_garbage_collector_stats()
359
360
showard170873e2009-01-07 00:22:26 +0000361 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
362 for object_id in object_ids:
363 agent_dict.setdefault(object_id, set()).add(agent)
364
365
366 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
367 for object_id in object_ids:
368 assert object_id in agent_dict
369 agent_dict[object_id].remove(agent)
370
371
showardd1195652009-12-08 22:21:02 +0000372 def add_agent_task(self, agent_task):
373 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000374 self._agents.append(agent)
375 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000376 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
377 self._register_agent_for_ids(self._queue_entry_agents,
378 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000379
showard170873e2009-01-07 00:22:26 +0000380
381 def get_agents_for_entry(self, queue_entry):
382 """
383 Find agents corresponding to the specified queue_entry.
384 """
showardd3dc1992009-04-22 21:01:40 +0000385 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000386
387
388 def host_has_agent(self, host):
389 """
390 Determine if there is currently an Agent present using this host.
391 """
392 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000393
394
jadmanski0afbb632008-06-06 21:10:57 +0000395 def remove_agent(self, agent):
396 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000397 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
398 agent)
399 self._unregister_agent_for_ids(self._queue_entry_agents,
400 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000401
402
showard8cc058f2009-09-08 16:26:33 +0000403 def _host_has_scheduled_special_task(self, host):
404 return bool(models.SpecialTask.objects.filter(host__id=host.id,
405 is_active=False,
406 is_complete=False))
407
408
jadmanski0afbb632008-06-06 21:10:57 +0000409 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000410 agent_tasks = self._create_recovery_agent_tasks()
411 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000412 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000413 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000414 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000415 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000416 self._reverify_remaining_hosts()
417 # reinitialize drones after killing orphaned processes, since they can
418 # leave around files when they die
419 _drone_manager.execute_actions()
420 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000421
showard170873e2009-01-07 00:22:26 +0000422
showardd1195652009-12-08 22:21:02 +0000423 def _create_recovery_agent_tasks(self):
424 return (self._get_queue_entry_agent_tasks()
425 + self._get_special_task_agent_tasks(is_active=True))
426
427
428 def _get_queue_entry_agent_tasks(self):
429 # host queue entry statuses handled directly by AgentTasks (Verifying is
430 # handled through SpecialTasks, so is not listed here)
431 statuses = (models.HostQueueEntry.Status.STARTING,
432 models.HostQueueEntry.Status.RUNNING,
433 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000434 models.HostQueueEntry.Status.PARSING,
435 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000436 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000437 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000438 where='status IN (%s)' % status_list)
439
440 agent_tasks = []
441 used_queue_entries = set()
442 for entry in queue_entries:
443 if self.get_agents_for_entry(entry):
444 # already being handled
445 continue
446 if entry in used_queue_entries:
447 # already picked up by a synchronous job
448 continue
449 agent_task = self._get_agent_task_for_queue_entry(entry)
450 agent_tasks.append(agent_task)
451 used_queue_entries.update(agent_task.queue_entries)
452 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000453
454
showardd1195652009-12-08 22:21:02 +0000455 def _get_special_task_agent_tasks(self, is_active=False):
456 special_tasks = models.SpecialTask.objects.filter(
457 is_active=is_active, is_complete=False)
458 return [self._get_agent_task_for_special_task(task)
459 for task in special_tasks]
460
461
462 def _get_agent_task_for_queue_entry(self, queue_entry):
463 """
464 Construct an AgentTask instance for the given active HostQueueEntry,
465 if one can currently run it.
466 @param queue_entry: a HostQueueEntry
467 @returns an AgentTask to run the queue entry
468 """
469 task_entries = queue_entry.job.get_group_entries(queue_entry)
470 self._check_for_duplicate_host_entries(task_entries)
471
472 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
473 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000474 if queue_entry.is_hostless():
475 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000476 return QueueTask(queue_entries=task_entries)
477 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
478 return GatherLogsTask(queue_entries=task_entries)
479 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
480 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000481 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
482 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000483
Dale Curtisaa513362011-03-01 17:27:44 -0800484 raise host_scheduler.SchedulerError(
485 '_get_agent_task_for_queue_entry got entry with '
486 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000487
488
489 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000490 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
491 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000492 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000493 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000494 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000495 if using_host:
showardd1195652009-12-08 22:21:02 +0000496 self._assert_host_has_no_agent(task_entry)
497
498
499 def _assert_host_has_no_agent(self, entry):
500 """
501 @param entry: a HostQueueEntry or a SpecialTask
502 """
503 if self.host_has_agent(entry.host):
504 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800505 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000506 'While scheduling %s, host %s already has a host agent %s'
507 % (entry, entry.host, agent.task))
508
509
510 def _get_agent_task_for_special_task(self, special_task):
511 """
512 Construct an AgentTask class to run the given SpecialTask and add it
513 to this dispatcher.
514 @param special_task: a models.SpecialTask instance
515 @returns an AgentTask to run this SpecialTask
516 """
517 self._assert_host_has_no_agent(special_task)
518
519 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
520 for agent_task_class in special_agent_task_classes:
521 if agent_task_class.TASK_TYPE == special_task.task:
522 return agent_task_class(task=special_task)
523
Dale Curtisaa513362011-03-01 17:27:44 -0800524 raise host_scheduler.SchedulerError(
525 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000526
527
528 def _register_pidfiles(self, agent_tasks):
529 for agent_task in agent_tasks:
530 agent_task.register_necessary_pidfiles()
531
532
533 def _recover_tasks(self, agent_tasks):
534 orphans = _drone_manager.get_orphaned_autoserv_processes()
535
536 for agent_task in agent_tasks:
537 agent_task.recover()
538 if agent_task.monitor and agent_task.monitor.has_process():
539 orphans.discard(agent_task.monitor.get_process())
540 self.add_agent_task(agent_task)
541
542 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000543
544
showard8cc058f2009-09-08 16:26:33 +0000545 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000546 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
547 % status):
showard0db3d432009-10-12 20:29:15 +0000548 if entry.status == status and not self.get_agents_for_entry(entry):
549 # The status can change during iteration, e.g., if job.run()
550 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000551 yield entry
552
553
showard6878e8b2009-07-20 22:37:45 +0000554 def _check_for_remaining_orphan_processes(self, orphans):
555 if not orphans:
556 return
557 subject = 'Unrecovered orphan autoserv processes remain'
558 message = '\n'.join(str(process) for process in orphans)
559 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000560
561 die_on_orphans = global_config.global_config.get_config_value(
562 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
563
564 if die_on_orphans:
565 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000566
showard170873e2009-01-07 00:22:26 +0000567
showard8cc058f2009-09-08 16:26:33 +0000568 def _recover_pending_entries(self):
569 for entry in self._get_unassigned_entries(
570 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000571 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000572 entry.on_pending()
573
574
showardb8900452009-10-12 20:31:01 +0000575 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000576 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000577 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
578 unrecovered_hqes = []
579 for queue_entry in queue_entries:
580 special_tasks = models.SpecialTask.objects.filter(
581 task__in=(models.SpecialTask.Task.CLEANUP,
582 models.SpecialTask.Task.VERIFY),
583 queue_entry__id=queue_entry.id,
584 is_complete=False)
585 if special_tasks.count() == 0:
586 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000587
showardb8900452009-10-12 20:31:01 +0000588 if unrecovered_hqes:
589 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800590 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000591 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000592 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000593
594
showard65db3932009-10-28 19:54:35 +0000595 def _get_prioritized_special_tasks(self):
596 """
597 Returns all queued SpecialTasks prioritized for repair first, then
598 cleanup, then verify.
599 """
600 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
601 is_complete=False,
602 host__locked=False)
603 # exclude hosts with active queue entries unless the SpecialTask is for
604 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000605 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000606 queued_tasks, 'afe_host_queue_entries', 'host_id',
607 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000608 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000609 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000610 where=['(afe_host_queue_entries.id IS NULL OR '
611 'afe_host_queue_entries.id = '
612 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000613
showard65db3932009-10-28 19:54:35 +0000614 # reorder tasks by priority
615 task_priority_order = [models.SpecialTask.Task.REPAIR,
616 models.SpecialTask.Task.CLEANUP,
617 models.SpecialTask.Task.VERIFY]
618 def task_priority_key(task):
619 return task_priority_order.index(task.task)
620 return sorted(queued_tasks, key=task_priority_key)
621
622
showard65db3932009-10-28 19:54:35 +0000623 def _schedule_special_tasks(self):
624 """
625 Execute queued SpecialTasks that are ready to run on idle hosts.
626 """
627 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000628 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000629 continue
showardd1195652009-12-08 22:21:02 +0000630 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000631
632
showard170873e2009-01-07 00:22:26 +0000633 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000634 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000635 # should never happen
showarded2afea2009-07-07 20:54:07 +0000636 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000637 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000638 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000639 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000640 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000641
642
jadmanski0afbb632008-06-06 21:10:57 +0000643 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000644 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700645 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000646 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000647 if self.host_has_agent(host):
648 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000649 continue
showard8cc058f2009-09-08 16:26:33 +0000650 if self._host_has_scheduled_special_task(host):
651 # host will have a special task scheduled on the next cycle
652 continue
showard170873e2009-01-07 00:22:26 +0000653 if print_message:
showardb18134f2009-03-20 20:52:18 +0000654 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000655 models.SpecialTask.objects.create(
656 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000657 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000658
659
jadmanski0afbb632008-06-06 21:10:57 +0000660 def _recover_hosts(self):
661 # recover "Repair Failed" hosts
662 message = 'Reverifying dead host %s'
663 self._reverify_hosts_where("status = 'Repair Failed'",
664 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000665
666
showard04c82c52008-05-29 19:38:12 +0000667
showardb95b1bd2008-08-15 18:11:04 +0000668 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000669 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +0000670 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000671 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000672 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +0000673 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000674
675
showard89f84db2009-03-12 20:39:13 +0000676 def _refresh_pending_queue_entries(self):
677 """
678 Lookup the pending HostQueueEntries and call our HostScheduler
679 refresh() method given that list. Return the list.
680
681 @returns A list of pending HostQueueEntries sorted in priority order.
682 """
showard63a34772008-08-18 19:32:50 +0000683 queue_entries = self._get_pending_queue_entries()
684 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000685 return []
showardb95b1bd2008-08-15 18:11:04 +0000686
showard63a34772008-08-18 19:32:50 +0000687 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000688
showard89f84db2009-03-12 20:39:13 +0000689 return queue_entries
690
691
692 def _schedule_atomic_group(self, queue_entry):
693 """
694 Schedule the given queue_entry on an atomic group of hosts.
695
696 Returns immediately if there are insufficient available hosts.
697
698 Creates new HostQueueEntries based off of queue_entry for the
699 scheduled hosts and starts them all running.
700 """
701 # This is a virtual host queue entry representing an entire
702 # atomic group, find a group and schedule their hosts.
703 group_hosts = self._host_scheduler.find_eligible_atomic_group(
704 queue_entry)
705 if not group_hosts:
706 return
showardcbe6f942009-06-17 19:33:49 +0000707
708 logging.info('Expanding atomic group entry %s with hosts %s',
709 queue_entry,
710 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000711
showard89f84db2009-03-12 20:39:13 +0000712 for assigned_host in group_hosts[1:]:
713 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000714 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000715 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000716 new_hqe.set_host(assigned_host)
717 self._run_queue_entry(new_hqe)
718
719 # The first assigned host uses the original HostQueueEntry
720 queue_entry.set_host(group_hosts[0])
721 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000722
723
showarda9545c02009-12-18 22:44:26 +0000724 def _schedule_hostless_job(self, queue_entry):
725 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000726 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000727
728
showard89f84db2009-03-12 20:39:13 +0000729 def _schedule_new_jobs(self):
730 queue_entries = self._refresh_pending_queue_entries()
731 if not queue_entries:
732 return
733
Simran Basi3f6717d2012-09-13 15:21:22 -0700734 logging.debug('Processing %d queue_entries', len(queue_entries))
showard63a34772008-08-18 19:32:50 +0000735 for queue_entry in queue_entries:
Simran Basidef92872012-09-20 13:34:34 -0700736 self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
showarde55955f2009-10-07 20:48:58 +0000737 is_unassigned_atomic_group = (
738 queue_entry.atomic_group_id is not None
739 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +0000740
741 if queue_entry.is_hostless():
Simran Basidef92872012-09-20 13:34:34 -0700742 self._log_extra_msg('Scheduling hostless job.')
showarda9545c02009-12-18 22:44:26 +0000743 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000744 elif is_unassigned_atomic_group:
745 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +0000746 else:
jamesren883492a2010-02-12 00:45:18 +0000747 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +0000748 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +0000749 assert assigned_host.id == queue_entry.host_id
750 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000751
752
showard8cc058f2009-09-08 16:26:33 +0000753 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +0000754 for agent_task in self._get_queue_entry_agent_tasks():
755 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000756
757
758 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000759 for entry in scheduler_models.HostQueueEntry.fetch(
760 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000761 task = entry.job.schedule_delayed_callback_task(entry)
762 if task:
showardd1195652009-12-08 22:21:02 +0000763 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000764
765
jamesren883492a2010-02-12 00:45:18 +0000766 def _run_queue_entry(self, queue_entry):
Simran Basidef92872012-09-20 13:34:34 -0700767 self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
768 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000769 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000770
771
jadmanski0afbb632008-06-06 21:10:57 +0000772 def _find_aborting(self):
jamesrene7c65cb2010-06-08 20:38:10 +0000773 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000774 for entry in scheduler_models.HostQueueEntry.fetch(
775 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +0000776 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +0000777 for agent in self.get_agents_for_entry(entry):
778 agent.abort()
779 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000780 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700781 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000782 for job in jobs_to_stop:
783 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000784
785
showard324bf812009-01-20 23:23:38 +0000786 def _can_start_agent(self, agent, num_started_this_cycle,
787 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000788 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000789 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000790 return True
791 # don't allow any nonzero-process agents to run after we've reached a
792 # limit (this avoids starvation of many-process agents)
793 if have_reached_limit:
794 return False
795 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000796 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000797 agent.task.owner_username,
798 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000799 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000800 return False
801 # if a single agent exceeds the per-cycle throttling, still allow it to
802 # run when it's the first agent in the cycle
803 if num_started_this_cycle == 0:
804 return True
805 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000806 if (num_started_this_cycle + agent.task.num_processes >
807 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000808 return False
809 return True
810
811
jadmanski0afbb632008-06-06 21:10:57 +0000812 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000813 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000814 have_reached_limit = False
815 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700816 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000817 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700818 self._log_extra_msg('Processing Agent with Host Ids: %s and '
819 'queue_entry ids:%s' % (agent.host_ids,
820 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000821 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000822 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000823 have_reached_limit):
824 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700825 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000826 continue
showardd1195652009-12-08 22:21:02 +0000827 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700828 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000829 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700830 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000831 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700832 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000833 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700834 logging.info('%d running processes. %d added this cycle.',
835 _drone_manager.total_running_processes(),
836 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +0000837
838
showard29f7cd22009-04-29 21:16:24 +0000839 def _process_recurring_runs(self):
840 recurring_runs = models.RecurringRun.objects.filter(
841 start_date__lte=datetime.datetime.now())
842 for rrun in recurring_runs:
843 # Create job from template
844 job = rrun.job
845 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000846 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000847
848 host_objects = info['hosts']
849 one_time_hosts = info['one_time_hosts']
850 metahost_objects = info['meta_hosts']
851 dependencies = info['dependencies']
852 atomic_group = info['atomic_group']
853
854 for host in one_time_hosts or []:
855 this_host = models.Host.create_one_time_host(host.hostname)
856 host_objects.append(this_host)
857
858 try:
859 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000860 options=options,
showard29f7cd22009-04-29 21:16:24 +0000861 host_objects=host_objects,
862 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +0000863 atomic_group=atomic_group)
864
865 except Exception, ex:
866 logging.exception(ex)
867 #TODO send email
868
869 if rrun.loop_count == 1:
870 rrun.delete()
871 else:
872 if rrun.loop_count != 0: # if not infinite loop
873 # calculate new start_date
874 difference = datetime.timedelta(seconds=rrun.loop_period)
875 rrun.start_date = rrun.start_date + difference
876 rrun.loop_count -= 1
877 rrun.save()
878
879
Simran Basia858a232012-08-21 11:04:37 -0700880SiteDispatcher = utils.import_site_class(
881 __file__, 'autotest_lib.scheduler.site_monitor_db',
882 'SiteDispatcher', BaseDispatcher)
883
884class Dispatcher(SiteDispatcher):
885 pass
886
887
showard170873e2009-01-07 00:22:26 +0000888class PidfileRunMonitor(object):
889 """
890 Client must call either run() to start a new process or
891 attach_to_existing_process().
892 """
mbligh36768f02008-02-22 18:28:33 +0000893
showard170873e2009-01-07 00:22:26 +0000894 class _PidfileException(Exception):
895 """
896 Raised when there's some unexpected behavior with the pid file, but only
897 used internally (never allowed to escape this class).
898 """
mbligh36768f02008-02-22 18:28:33 +0000899
900
showard170873e2009-01-07 00:22:26 +0000901 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000902 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000903 self._start_time = None
904 self.pidfile_id = None
905 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000906
907
showard170873e2009-01-07 00:22:26 +0000908 def _add_nice_command(self, command, nice_level):
909 if not nice_level:
910 return command
911 return ['nice', '-n', str(nice_level)] + command
912
913
914 def _set_start_time(self):
915 self._start_time = time.time()
916
917
showard418785b2009-11-23 20:19:59 +0000918 def run(self, command, working_directory, num_processes, nice_level=None,
919 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +0000920 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +0000921 assert command is not None
922 if nice_level is not None:
923 command = ['nice', '-n', str(nice_level)] + command
924 self._set_start_time()
925 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +0000926 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +0000927 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +0000928 paired_with_pidfile=paired_with_pidfile, username=username,
929 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +0000930
931
showarded2afea2009-07-07 20:54:07 +0000932 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +0000933 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +0000934 num_processes=None):
showard170873e2009-01-07 00:22:26 +0000935 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +0000936 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000937 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +0000938 if num_processes is not None:
939 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +0000940
941
jadmanski0afbb632008-06-06 21:10:57 +0000942 def kill(self):
showard170873e2009-01-07 00:22:26 +0000943 if self.has_process():
944 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000945
mbligh36768f02008-02-22 18:28:33 +0000946
showard170873e2009-01-07 00:22:26 +0000947 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000948 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000949 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000950
951
showard170873e2009-01-07 00:22:26 +0000952 def get_process(self):
showard21baa452008-10-21 00:08:39 +0000953 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +0000954 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +0000955 return self._state.process
mblighbb421852008-03-11 22:36:16 +0000956
957
showard170873e2009-01-07 00:22:26 +0000958 def _read_pidfile(self, use_second_read=False):
959 assert self.pidfile_id is not None, (
960 'You must call run() or attach_to_existing_process()')
961 contents = _drone_manager.get_pidfile_contents(
962 self.pidfile_id, use_second_read=use_second_read)
963 if contents.is_invalid():
964 self._state = drone_manager.PidfileContents()
965 raise self._PidfileException(contents)
966 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +0000967
968
showard21baa452008-10-21 00:08:39 +0000969 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +0000970 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
971 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +0000972 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +0000973 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +0000974
975
976 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +0000977 if self.lost_process:
showard21baa452008-10-21 00:08:39 +0000978 return
mblighbb421852008-03-11 22:36:16 +0000979
showard21baa452008-10-21 00:08:39 +0000980 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +0000981
showard170873e2009-01-07 00:22:26 +0000982 if self._state.process is None:
983 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +0000984 return
mbligh90a549d2008-03-25 23:52:34 +0000985
showard21baa452008-10-21 00:08:39 +0000986 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000987 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +0000988 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +0000989 return
mbligh90a549d2008-03-25 23:52:34 +0000990
showard170873e2009-01-07 00:22:26 +0000991 # pid but no running process - maybe process *just* exited
992 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +0000993 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000994 # autoserv exited without writing an exit code
995 # to the pidfile
showard21baa452008-10-21 00:08:39 +0000996 self._handle_pidfile_error(
997 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +0000998
showard21baa452008-10-21 00:08:39 +0000999
1000 def _get_pidfile_info(self):
1001 """\
1002 After completion, self._state will contain:
1003 pid=None, exit_status=None if autoserv has not yet run
1004 pid!=None, exit_status=None if autoserv is running
1005 pid!=None, exit_status!=None if autoserv has completed
1006 """
1007 try:
1008 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001009 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001010 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001011
1012
showard170873e2009-01-07 00:22:26 +00001013 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001014 """\
1015 Called when no pidfile is found or no pid is in the pidfile.
1016 """
showard170873e2009-01-07 00:22:26 +00001017 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001018 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001019 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001020 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001021 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001022
1023
showard35162b02009-03-03 02:17:30 +00001024 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001025 """\
1026 Called when autoserv has exited without writing an exit status,
1027 or we've timed out waiting for autoserv to write a pid to the
1028 pidfile. In either case, we just return failure and the caller
1029 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001030
showard170873e2009-01-07 00:22:26 +00001031 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001032 """
1033 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001034 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001035 self._state.exit_status = 1
1036 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001037
1038
jadmanski0afbb632008-06-06 21:10:57 +00001039 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001040 self._get_pidfile_info()
1041 return self._state.exit_status
1042
1043
1044 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001045 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001046 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001047 if self._state.num_tests_failed is None:
1048 return -1
showard21baa452008-10-21 00:08:39 +00001049 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001050
1051
showardcdaeae82009-08-31 18:32:48 +00001052 def try_copy_results_on_drone(self, **kwargs):
1053 if self.has_process():
1054 # copy results logs into the normal place for job results
1055 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1056
1057
1058 def try_copy_to_results_repository(self, source, **kwargs):
1059 if self.has_process():
1060 _drone_manager.copy_to_results_repository(self.get_process(),
1061 source, **kwargs)
1062
1063
mbligh36768f02008-02-22 18:28:33 +00001064class Agent(object):
showard77182562009-06-10 00:16:05 +00001065 """
showard8cc058f2009-09-08 16:26:33 +00001066 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001067
1068 The following methods are required on all task objects:
1069 poll() - Called periodically to let the task check its status and
1070 update its internal state. If the task succeeded.
1071 is_done() - Returns True if the task is finished.
1072 abort() - Called when an abort has been requested. The task must
1073 set its aborted attribute to True if it actually aborted.
1074
1075 The following attributes are required on all task objects:
1076 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001077 success - bool, True if this task succeeded.
1078 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1079 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001080 """
1081
1082
showard418785b2009-11-23 20:19:59 +00001083 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001084 """
showard8cc058f2009-09-08 16:26:33 +00001085 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001086 """
showard8cc058f2009-09-08 16:26:33 +00001087 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001088
showard77182562009-06-10 00:16:05 +00001089 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001090 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001091
showard8cc058f2009-09-08 16:26:33 +00001092 self.queue_entry_ids = task.queue_entry_ids
1093 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001094
showard8cc058f2009-09-08 16:26:33 +00001095 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001096 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001097
1098
jadmanski0afbb632008-06-06 21:10:57 +00001099 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001100 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001101 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001102 self.task.poll()
1103 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001104 self.finished = True
showardec113162008-05-08 00:52:49 +00001105
1106
jadmanski0afbb632008-06-06 21:10:57 +00001107 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001108 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001109
1110
showardd3dc1992009-04-22 21:01:40 +00001111 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001112 if self.task:
1113 self.task.abort()
1114 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001115 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001116 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001117
showardd3dc1992009-04-22 21:01:40 +00001118
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001119class BaseAgentTask(object):
showardd1195652009-12-08 22:21:02 +00001120 class _NullMonitor(object):
1121 pidfile_id = None
1122
1123 def has_process(self):
1124 return True
1125
1126
1127 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001128 """
showardd1195652009-12-08 22:21:02 +00001129 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001130 """
jadmanski0afbb632008-06-06 21:10:57 +00001131 self.done = False
showardd1195652009-12-08 22:21:02 +00001132 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001133 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001134 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001135 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001136 self.queue_entry_ids = []
1137 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001138 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001139
1140
1141 def _set_ids(self, host=None, queue_entries=None):
1142 if queue_entries and queue_entries != [None]:
1143 self.host_ids = [entry.host.id for entry in queue_entries]
1144 self.queue_entry_ids = [entry.id for entry in queue_entries]
1145 else:
1146 assert host
1147 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001148
1149
jadmanski0afbb632008-06-06 21:10:57 +00001150 def poll(self):
showard08a36412009-05-05 01:01:13 +00001151 if not self.started:
1152 self.start()
showardd1195652009-12-08 22:21:02 +00001153 if not self.done:
1154 self.tick()
showard08a36412009-05-05 01:01:13 +00001155
1156
1157 def tick(self):
showardd1195652009-12-08 22:21:02 +00001158 assert self.monitor
1159 exit_code = self.monitor.exit_code()
1160 if exit_code is None:
1161 return
mbligh36768f02008-02-22 18:28:33 +00001162
showardd1195652009-12-08 22:21:02 +00001163 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001164 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001165
1166
jadmanski0afbb632008-06-06 21:10:57 +00001167 def is_done(self):
1168 return self.done
mbligh36768f02008-02-22 18:28:33 +00001169
1170
jadmanski0afbb632008-06-06 21:10:57 +00001171 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001172 if self.done:
showardd1195652009-12-08 22:21:02 +00001173 assert self.started
showard08a36412009-05-05 01:01:13 +00001174 return
showardd1195652009-12-08 22:21:02 +00001175 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001176 self.done = True
1177 self.success = success
1178 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001179
1180
jadmanski0afbb632008-06-06 21:10:57 +00001181 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001182 """
1183 To be overridden.
1184 """
showarded2afea2009-07-07 20:54:07 +00001185 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001186 self.register_necessary_pidfiles()
1187
1188
1189 def _log_file(self):
1190 if not self._log_file_name:
1191 return None
1192 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001193
mbligh36768f02008-02-22 18:28:33 +00001194
jadmanski0afbb632008-06-06 21:10:57 +00001195 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001196 log_file = self._log_file()
1197 if self.monitor and log_file:
1198 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001199
1200
jadmanski0afbb632008-06-06 21:10:57 +00001201 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001202 """
1203 To be overridden.
1204 """
jadmanski0afbb632008-06-06 21:10:57 +00001205 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001206 logging.info("%s finished with success=%s", type(self).__name__,
1207 self.success)
1208
mbligh36768f02008-02-22 18:28:33 +00001209
1210
jadmanski0afbb632008-06-06 21:10:57 +00001211 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001212 if not self.started:
1213 self.prolog()
1214 self.run()
1215
1216 self.started = True
1217
1218
1219 def abort(self):
1220 if self.monitor:
1221 self.monitor.kill()
1222 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001223 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001224 self.cleanup()
1225
1226
showarded2afea2009-07-07 20:54:07 +00001227 def _get_consistent_execution_path(self, execution_entries):
1228 first_execution_path = execution_entries[0].execution_path()
1229 for execution_entry in execution_entries[1:]:
1230 assert execution_entry.execution_path() == first_execution_path, (
1231 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1232 execution_entry,
1233 first_execution_path,
1234 execution_entries[0]))
1235 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001236
1237
showarded2afea2009-07-07 20:54:07 +00001238 def _copy_results(self, execution_entries, use_monitor=None):
1239 """
1240 @param execution_entries: list of objects with execution_path() method
1241 """
showard6d1c1432009-08-20 23:30:39 +00001242 if use_monitor is not None and not use_monitor.has_process():
1243 return
1244
showarded2afea2009-07-07 20:54:07 +00001245 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001246 if use_monitor is None:
1247 assert self.monitor
1248 use_monitor = self.monitor
1249 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001250 execution_path = self._get_consistent_execution_path(execution_entries)
1251 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001252 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001253
showarda1e74b32009-05-12 17:32:04 +00001254
1255 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001256 for queue_entry in queue_entries:
1257 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001258
1259
mbligh4608b002010-01-05 18:22:35 +00001260 def _archive_results(self, queue_entries):
1261 for queue_entry in queue_entries:
1262 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001263
1264
showardd1195652009-12-08 22:21:02 +00001265 def _command_line(self):
1266 """
1267 Return the command line to run. Must be overridden.
1268 """
1269 raise NotImplementedError
1270
1271
1272 @property
1273 def num_processes(self):
1274 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001275 Return the number of processes forked by this BaseAgentTask's process.
1276 It may only be approximate. To be overridden if necessary.
showardd1195652009-12-08 22:21:02 +00001277 """
1278 return 1
1279
1280
1281 def _paired_with_monitor(self):
1282 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001283 If this BaseAgentTask's process must run on the same machine as some
showardd1195652009-12-08 22:21:02 +00001284 previous process, this method should be overridden to return a
1285 PidfileRunMonitor for that process.
1286 """
1287 return self._NullMonitor()
1288
1289
1290 @property
1291 def owner_username(self):
1292 """
1293 Return login of user responsible for this task. May be None. Must be
1294 overridden.
1295 """
1296 raise NotImplementedError
1297
1298
1299 def _working_directory(self):
1300 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001301 Return the directory where this BaseAgentTask's process executes.
1302 Must be overridden.
showardd1195652009-12-08 22:21:02 +00001303 """
1304 raise NotImplementedError
1305
1306
1307 def _pidfile_name(self):
1308 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001309 Return the name of the pidfile this BaseAgentTask's process uses. To be
showardd1195652009-12-08 22:21:02 +00001310 overridden if necessary.
1311 """
jamesrenc44ae992010-02-19 00:12:54 +00001312 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001313
1314
1315 def _check_paired_results_exist(self):
1316 if not self._paired_with_monitor().has_process():
1317 email_manager.manager.enqueue_notify_email(
1318 'No paired results in task',
1319 'No paired results in task %s at %s'
1320 % (self, self._paired_with_monitor().pidfile_id))
1321 self.finished(False)
1322 return False
1323 return True
1324
1325
1326 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001327 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001328 self.monitor = PidfileRunMonitor()
1329
1330
1331 def run(self):
1332 if not self._check_paired_results_exist():
1333 return
1334
1335 self._create_monitor()
1336 self.monitor.run(
1337 self._command_line(), self._working_directory(),
1338 num_processes=self.num_processes,
1339 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1340 pidfile_name=self._pidfile_name(),
1341 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001342 username=self.owner_username,
1343 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1344
1345
1346 def get_drone_hostnames_allowed(self):
1347 if not models.DroneSet.drone_sets_enabled():
1348 return None
1349
1350 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1351 if not hqes:
1352 # Only special tasks could be missing host queue entries
1353 assert isinstance(self, SpecialAgentTask)
1354 return self._user_or_global_default_drone_set(
1355 self.task, self.task.requested_by)
1356
1357 job_ids = hqes.values_list('job', flat=True).distinct()
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001358 assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
jamesren76fcf192010-04-21 20:39:50 +00001359 "span multiple jobs")
1360
1361 job = models.Job.objects.get(id=job_ids[0])
1362 drone_set = job.drone_set
1363 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001364 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001365
1366 return drone_set.get_drone_hostnames()
1367
1368
1369 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1370 """
1371 Returns the user's default drone set, if present.
1372
1373 Otherwise, returns the global default drone set.
1374 """
1375 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1376 if not user:
1377 logging.warn('%s had no owner; using default drone set',
1378 obj_with_owner)
1379 return default_hostnames
1380 if not user.drone_set:
1381 logging.warn('User %s has no default drone set, using global '
1382 'default', user.login)
1383 return default_hostnames
1384 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001385
1386
1387 def register_necessary_pidfiles(self):
1388 pidfile_id = _drone_manager.get_pidfile_id_from(
1389 self._working_directory(), self._pidfile_name())
1390 _drone_manager.register_pidfile(pidfile_id)
1391
1392 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1393 if paired_pidfile_id:
1394 _drone_manager.register_pidfile(paired_pidfile_id)
1395
1396
1397 def recover(self):
1398 if not self._check_paired_results_exist():
1399 return
1400
1401 self._create_monitor()
1402 self.monitor.attach_to_existing_process(
1403 self._working_directory(), pidfile_name=self._pidfile_name(),
1404 num_processes=self.num_processes)
1405 if not self.monitor.has_process():
1406 # no process to recover; wait to be started normally
1407 self.monitor = None
1408 return
1409
1410 self.started = True
Aviv Keshet225bdfe2013-03-05 10:10:08 -08001411 logging.info('Recovering process %s for %s at %s',
1412 self.monitor.get_process(), type(self).__name__,
1413 self._working_directory())
mbligh36768f02008-02-22 18:28:33 +00001414
1415
mbligh4608b002010-01-05 18:22:35 +00001416 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1417 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001418 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001419 for entry in queue_entries:
1420 if entry.status not in allowed_hqe_statuses:
Dale Curtisaa513362011-03-01 17:27:44 -08001421 raise host_scheduler.SchedulerError(
1422 '%s attempting to start entry with invalid status %s: '
1423 '%s' % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001424 invalid_host_status = (
1425 allowed_host_statuses is not None
1426 and entry.host.status not in allowed_host_statuses)
1427 if invalid_host_status:
Dale Curtisaa513362011-03-01 17:27:44 -08001428 raise host_scheduler.SchedulerError(
1429 '%s attempting to start on queue entry with invalid '
1430 'host status %s: %s'
1431 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001432
1433
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001434SiteAgentTask = utils.import_site_class(
1435 __file__, 'autotest_lib.scheduler.site_monitor_db',
1436 'SiteAgentTask', BaseAgentTask)
1437
1438class AgentTask(SiteAgentTask):
1439 pass
1440
1441
showardd9205182009-04-27 20:09:55 +00001442class TaskWithJobKeyvals(object):
1443 """AgentTask mixin providing functionality to help with job keyval files."""
1444 _KEYVAL_FILE = 'keyval'
1445 def _format_keyval(self, key, value):
1446 return '%s=%s' % (key, value)
1447
1448
1449 def _keyval_path(self):
1450 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001451 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001452
1453
1454 def _write_keyval_after_job(self, field, value):
1455 assert self.monitor
1456 if not self.monitor.has_process():
1457 return
1458 _drone_manager.write_lines_to_file(
1459 self._keyval_path(), [self._format_keyval(field, value)],
1460 paired_with_process=self.monitor.get_process())
1461
1462
1463 def _job_queued_keyval(self, job):
1464 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1465
1466
1467 def _write_job_finished(self):
1468 self._write_keyval_after_job("job_finished", int(time.time()))
1469
1470
showarddb502762009-09-09 15:31:20 +00001471 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1472 keyval_contents = '\n'.join(self._format_keyval(key, value)
1473 for key, value in keyval_dict.iteritems())
1474 # always end with a newline to allow additional keyvals to be written
1475 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001476 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001477 keyval_contents,
1478 file_path=keyval_path)
1479
1480
1481 def _write_keyvals_before_job(self, keyval_dict):
1482 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1483
1484
1485 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001486 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001487 host.hostname)
1488 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001489 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001490 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1491 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1492
1493
showard8cc058f2009-09-08 16:26:33 +00001494class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001495 """
1496 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1497 """
1498
1499 TASK_TYPE = None
1500 host = None
1501 queue_entry = None
1502
showardd1195652009-12-08 22:21:02 +00001503 def __init__(self, task, extra_command_args):
1504 super(SpecialAgentTask, self).__init__()
1505
lmrb7c5d272010-04-16 06:34:04 +00001506 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001507
jamesrenc44ae992010-02-19 00:12:54 +00001508 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001509 self.queue_entry = None
1510 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001511 self.queue_entry = scheduler_models.HostQueueEntry(
1512 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001513
showarded2afea2009-07-07 20:54:07 +00001514 self.task = task
1515 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001516
1517
showard8cc058f2009-09-08 16:26:33 +00001518 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001519 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1520
1521
1522 def _command_line(self):
1523 return _autoserv_command_line(self.host.hostname,
1524 self._extra_command_args,
1525 queue_entry=self.queue_entry)
1526
1527
1528 def _working_directory(self):
1529 return self.task.execution_path()
1530
1531
1532 @property
1533 def owner_username(self):
1534 if self.task.requested_by:
1535 return self.task.requested_by.login
1536 return None
showard8cc058f2009-09-08 16:26:33 +00001537
1538
showarded2afea2009-07-07 20:54:07 +00001539 def prolog(self):
1540 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001541 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001542 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001543
1544
showardde634ee2009-01-30 01:44:24 +00001545 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001546 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001547
showard2fe3f1d2009-07-06 20:19:11 +00001548 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001549 return # don't fail metahost entries, they'll be reassigned
1550
showard2fe3f1d2009-07-06 20:19:11 +00001551 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001552 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001553 return # entry has been aborted
1554
showard2fe3f1d2009-07-06 20:19:11 +00001555 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001556 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001557 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001558 self._write_keyval_after_job(queued_key, queued_time)
1559 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001560
showard8cc058f2009-09-08 16:26:33 +00001561 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001562 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001563 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001564 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001565
showard8cc058f2009-09-08 16:26:33 +00001566 pidfile_id = _drone_manager.get_pidfile_id_from(
1567 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001568 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001569 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001570
1571 if self.queue_entry.job.parse_failed_repair:
1572 self._parse_results([self.queue_entry])
1573 else:
1574 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001575
1576
1577 def cleanup(self):
1578 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001579
1580 # We will consider an aborted task to be "Failed"
1581 self.task.finish(bool(self.success))
1582
showardf85a0b72009-10-07 20:48:45 +00001583 if self.monitor:
1584 if self.monitor.has_process():
1585 self._copy_results([self.task])
1586 if self.monitor.pidfile_id is not None:
1587 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001588
1589
1590class RepairTask(SpecialAgentTask):
1591 TASK_TYPE = models.SpecialTask.Task.REPAIR
1592
1593
showardd1195652009-12-08 22:21:02 +00001594 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001595 """\
1596 queue_entry: queue entry to mark failed if this repair fails.
1597 """
1598 protection = host_protections.Protection.get_string(
1599 task.host.protection)
1600 # normalize the protection name
1601 protection = host_protections.Protection.get_attr_name(protection)
1602
1603 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001604 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001605
1606 # *don't* include the queue entry in IDs -- if the queue entry is
1607 # aborted, we want to leave the repair task running
1608 self._set_ids(host=self.host)
1609
1610
1611 def prolog(self):
1612 super(RepairTask, self).prolog()
1613 logging.info("repair_task starting")
1614 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001615
1616
jadmanski0afbb632008-06-06 21:10:57 +00001617 def epilog(self):
1618 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001619
jadmanski0afbb632008-06-06 21:10:57 +00001620 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001621 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001622 else:
showard8cc058f2009-09-08 16:26:33 +00001623 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001624 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001625 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001626
1627
showarded2afea2009-07-07 20:54:07 +00001628class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001629 def _copy_to_results_repository(self):
1630 if not self.queue_entry or self.queue_entry.meta_host:
1631 return
1632
1633 self.queue_entry.set_execution_subdir()
1634 log_name = os.path.basename(self.task.execution_path())
1635 source = os.path.join(self.task.execution_path(), 'debug',
1636 'autoserv.DEBUG')
1637 destination = os.path.join(
1638 self.queue_entry.execution_path(), log_name)
1639
1640 self.monitor.try_copy_to_results_repository(
1641 source, destination_path=destination)
1642
1643
showard170873e2009-01-07 00:22:26 +00001644 def epilog(self):
1645 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001646
showard775300b2009-09-09 15:30:50 +00001647 if self.success:
1648 return
showard8fe93b52008-11-18 17:53:22 +00001649
showard775300b2009-09-09 15:30:50 +00001650 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001651
showard775300b2009-09-09 15:30:50 +00001652 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001653 # effectively ignore failure for these hosts
1654 self.success = True
showard775300b2009-09-09 15:30:50 +00001655 return
1656
1657 if self.queue_entry:
1658 self.queue_entry.requeue()
1659
1660 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001661 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001662 queue_entry__id=self.queue_entry.id):
1663 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1664 self._fail_queue_entry()
1665 return
1666
showard9bb960b2009-11-19 01:02:11 +00001667 queue_entry = models.HostQueueEntry.objects.get(
1668 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001669 else:
1670 queue_entry = None
1671
1672 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001673 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001674 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001675 queue_entry=queue_entry,
1676 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001677
showard8fe93b52008-11-18 17:53:22 +00001678
1679class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001680 TASK_TYPE = models.SpecialTask.Task.VERIFY
1681
1682
showardd1195652009-12-08 22:21:02 +00001683 def __init__(self, task):
1684 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001685 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001686
1687
jadmanski0afbb632008-06-06 21:10:57 +00001688 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001689 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001690
showardb18134f2009-03-20 20:52:18 +00001691 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001692 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001693 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1694 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001695
jamesren42318f72010-05-10 23:40:59 +00001696 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00001697 # and there's no need to keep records of other requests.
1698 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001699 host__id=self.host.id,
1700 task=models.SpecialTask.Task.VERIFY,
jamesren42318f72010-05-10 23:40:59 +00001701 is_active=False, is_complete=False, queue_entry=None)
showarded2afea2009-07-07 20:54:07 +00001702 queued_verifies = queued_verifies.exclude(id=self.task.id)
1703 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001704
mbligh36768f02008-02-22 18:28:33 +00001705
jadmanski0afbb632008-06-06 21:10:57 +00001706 def epilog(self):
1707 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001708 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001709 if self.queue_entry:
1710 self.queue_entry.on_pending()
1711 else:
1712 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001713
1714
mbligh4608b002010-01-05 18:22:35 +00001715class CleanupTask(PreJobTask):
1716 # note this can also run post-job, but when it does, it's running standalone
1717 # against the host (not related to the job), so it's not considered a
1718 # PostJobTask
1719
1720 TASK_TYPE = models.SpecialTask.Task.CLEANUP
1721
1722
1723 def __init__(self, task, recover_run_monitor=None):
1724 super(CleanupTask, self).__init__(task, ['--cleanup'])
1725 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1726
1727
1728 def prolog(self):
1729 super(CleanupTask, self).prolog()
1730 logging.info("starting cleanup task for host: %s", self.host.hostname)
1731 self.host.set_status(models.Host.Status.CLEANING)
1732 if self.queue_entry:
1733 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1734
1735
1736 def _finish_epilog(self):
1737 if not self.queue_entry or not self.success:
1738 return
1739
1740 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
1741 should_run_verify = (
1742 self.queue_entry.job.run_verify
1743 and self.host.protection != do_not_verify_protection)
1744 if should_run_verify:
1745 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1746 models.SpecialTask.objects.create(
1747 host=models.Host.objects.get(id=self.host.id),
1748 queue_entry=entry,
1749 task=models.SpecialTask.Task.VERIFY)
1750 else:
1751 self.queue_entry.on_pending()
1752
1753
1754 def epilog(self):
1755 super(CleanupTask, self).epilog()
1756
1757 if self.success:
1758 self.host.update_field('dirty', 0)
1759 self.host.set_status(models.Host.Status.READY)
1760
1761 self._finish_epilog()
1762
1763
showarda9545c02009-12-18 22:44:26 +00001764class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
1765 """
1766 Common functionality for QueueTask and HostlessQueueTask
1767 """
1768 def __init__(self, queue_entries):
1769 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001770 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001771 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001772
1773
showard73ec0442009-02-07 02:05:20 +00001774 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001775 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001776
1777
jamesrenc44ae992010-02-19 00:12:54 +00001778 def _write_control_file(self, execution_path):
1779 control_path = _drone_manager.attach_file_to_execution(
1780 execution_path, self.job.control_file)
1781 return control_path
1782
1783
showardd1195652009-12-08 22:21:02 +00001784 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001785 execution_path = self.queue_entries[0].execution_path()
1786 control_path = self._write_control_file(execution_path)
1787 hostnames = ','.join(entry.host.hostname
1788 for entry in self.queue_entries
1789 if not entry.is_hostless())
1790
1791 execution_tag = self.queue_entries[0].execution_tag()
1792 params = _autoserv_command_line(
1793 hostnames,
1794 ['-P', execution_tag, '-n',
1795 _drone_manager.absolute_path(control_path)],
1796 job=self.job, verbose=False)
1797
1798 if not self.job.is_server_job():
1799 params.append('-c')
1800
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001801 if self.job.is_image_update_job():
1802 params += ['--image', self.job.update_image_path]
1803
jamesrenc44ae992010-02-19 00:12:54 +00001804 return params
showardd1195652009-12-08 22:21:02 +00001805
1806
1807 @property
1808 def num_processes(self):
1809 return len(self.queue_entries)
1810
1811
1812 @property
1813 def owner_username(self):
1814 return self.job.owner
1815
1816
1817 def _working_directory(self):
1818 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001819
1820
jadmanski0afbb632008-06-06 21:10:57 +00001821 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001822 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001823 keyval_dict = self.job.keyval_dict()
1824 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001825 group_name = self.queue_entries[0].get_group_name()
1826 if group_name:
1827 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001828 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001829 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001830 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001831 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001832
1833
showard35162b02009-03-03 02:17:30 +00001834 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001835 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001836 _drone_manager.write_lines_to_file(error_file_path,
1837 [_LOST_PROCESS_ERROR])
1838
1839
showardd3dc1992009-04-22 21:01:40 +00001840 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001841 if not self.monitor:
1842 return
1843
showardd9205182009-04-27 20:09:55 +00001844 self._write_job_finished()
1845
showard35162b02009-03-03 02:17:30 +00001846 if self.monitor.lost_process:
1847 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001848
jadmanskif7fa2cc2008-10-01 14:13:23 +00001849
showardcbd74612008-11-19 21:42:02 +00001850 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001851 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001852 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001853 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001854 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001855
1856
jadmanskif7fa2cc2008-10-01 14:13:23 +00001857 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001858 if not self.monitor or not self.monitor.has_process():
1859 return
1860
jadmanskif7fa2cc2008-10-01 14:13:23 +00001861 # build up sets of all the aborted_by and aborted_on values
1862 aborted_by, aborted_on = set(), set()
1863 for queue_entry in self.queue_entries:
1864 if queue_entry.aborted_by:
1865 aborted_by.add(queue_entry.aborted_by)
1866 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1867 aborted_on.add(t)
1868
1869 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001870 # TODO(showard): this conditional is now obsolete, we just need to leave
1871 # it in temporarily for backwards compatibility over upgrades. delete
1872 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001873 assert len(aborted_by) <= 1
1874 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001875 aborted_by_value = aborted_by.pop()
1876 aborted_on_value = max(aborted_on)
1877 else:
1878 aborted_by_value = 'autotest_system'
1879 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001880
showarda0382352009-02-11 23:36:43 +00001881 self._write_keyval_after_job("aborted_by", aborted_by_value)
1882 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001883
showardcbd74612008-11-19 21:42:02 +00001884 aborted_on_string = str(datetime.datetime.fromtimestamp(
1885 aborted_on_value))
1886 self._write_status_comment('Job aborted by %s on %s' %
1887 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001888
1889
jadmanski0afbb632008-06-06 21:10:57 +00001890 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001891 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001892 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001893 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001894
1895
jadmanski0afbb632008-06-06 21:10:57 +00001896 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001897 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001898 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001899
1900
1901class QueueTask(AbstractQueueTask):
1902 def __init__(self, queue_entries):
1903 super(QueueTask, self).__init__(queue_entries)
1904 self._set_ids(queue_entries=queue_entries)
1905
1906
1907 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001908 self._check_queue_entry_statuses(
1909 self.queue_entries,
1910 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1911 models.HostQueueEntry.Status.RUNNING),
1912 allowed_host_statuses=(models.Host.Status.PENDING,
1913 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001914
1915 super(QueueTask, self).prolog()
1916
1917 for queue_entry in self.queue_entries:
1918 self._write_host_keyvals(queue_entry.host)
1919 queue_entry.host.set_status(models.Host.Status.RUNNING)
1920 queue_entry.host.update_field('dirty', 1)
1921 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1922 # TODO(gps): Remove this if nothing needs it anymore.
1923 # A potential user is: tko/parser
1924 self.job.write_to_machines_file(self.queue_entries[0])
1925
1926
1927 def _finish_task(self):
1928 super(QueueTask, self)._finish_task()
1929
1930 for queue_entry in self.queue_entries:
1931 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001932 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001933
1934
mbligh4608b002010-01-05 18:22:35 +00001935class HostlessQueueTask(AbstractQueueTask):
1936 def __init__(self, queue_entry):
1937 super(HostlessQueueTask, self).__init__([queue_entry])
1938 self.queue_entry_ids = [queue_entry.id]
1939
1940
1941 def prolog(self):
1942 self.queue_entries[0].update_field('execution_subdir', 'hostless')
1943 super(HostlessQueueTask, self).prolog()
1944
1945
mbligh4608b002010-01-05 18:22:35 +00001946 def _finish_task(self):
1947 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00001948 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001949
1950
showardd3dc1992009-04-22 21:01:40 +00001951class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00001952 def __init__(self, queue_entries, log_file_name):
1953 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00001954
showardd1195652009-12-08 22:21:02 +00001955 self.queue_entries = queue_entries
1956
showardd3dc1992009-04-22 21:01:40 +00001957 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00001958 self._autoserv_monitor.attach_to_existing_process(
1959 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00001960
showardd1195652009-12-08 22:21:02 +00001961
1962 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00001963 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00001964 return 'true'
1965 return self._generate_command(
1966 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00001967
1968
1969 def _generate_command(self, results_dir):
1970 raise NotImplementedError('Subclasses must override this')
1971
1972
showardd1195652009-12-08 22:21:02 +00001973 @property
1974 def owner_username(self):
1975 return self.queue_entries[0].job.owner
1976
1977
1978 def _working_directory(self):
1979 return self._get_consistent_execution_path(self.queue_entries)
1980
1981
1982 def _paired_with_monitor(self):
1983 return self._autoserv_monitor
1984
1985
showardd3dc1992009-04-22 21:01:40 +00001986 def _job_was_aborted(self):
1987 was_aborted = None
showardd1195652009-12-08 22:21:02 +00001988 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00001989 queue_entry.update_from_database()
1990 if was_aborted is None: # first queue entry
1991 was_aborted = bool(queue_entry.aborted)
1992 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00001993 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
1994 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00001995 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00001996 'Inconsistent abort state',
1997 'Queue entries have inconsistent abort state:\n' +
1998 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00001999 # don't crash here, just assume true
2000 return True
2001 return was_aborted
2002
2003
showardd1195652009-12-08 22:21:02 +00002004 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002005 if self._job_was_aborted():
2006 return models.HostQueueEntry.Status.ABORTED
2007
2008 # we'll use a PidfileRunMonitor to read the autoserv exit status
2009 if self._autoserv_monitor.exit_code() == 0:
2010 return models.HostQueueEntry.Status.COMPLETED
2011 return models.HostQueueEntry.Status.FAILED
2012
2013
showardd3dc1992009-04-22 21:01:40 +00002014 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002015 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002016 queue_entry.set_status(status)
2017
2018
2019 def abort(self):
2020 # override AgentTask.abort() to avoid killing the process and ending
2021 # the task. post-job tasks continue when the job is aborted.
2022 pass
2023
2024
mbligh4608b002010-01-05 18:22:35 +00002025 def _pidfile_label(self):
2026 # '.autoserv_execute' -> 'autoserv'
2027 return self._pidfile_name()[1:-len('_execute')]
2028
2029
showard9bb960b2009-11-19 01:02:11 +00002030class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002031 """
2032 Task responsible for
2033 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2034 * copying logs to the results repository
2035 * spawning CleanupTasks for hosts, if necessary
2036 * spawning a FinalReparseTask for the job
2037 """
showardd1195652009-12-08 22:21:02 +00002038 def __init__(self, queue_entries, recover_run_monitor=None):
2039 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002040 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002041 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002042 self._set_ids(queue_entries=queue_entries)
2043
2044
2045 def _generate_command(self, results_dir):
2046 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002047 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002048 return [_autoserv_path , '-p',
2049 '--pidfile-label=%s' % self._pidfile_label(),
2050 '--use-existing-results', '--collect-crashinfo',
2051 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002052
2053
showardd1195652009-12-08 22:21:02 +00002054 @property
2055 def num_processes(self):
2056 return len(self.queue_entries)
2057
2058
2059 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002060 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002061
2062
showardd3dc1992009-04-22 21:01:40 +00002063 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002064 self._check_queue_entry_statuses(
2065 self.queue_entries,
2066 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2067 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002068
showardd3dc1992009-04-22 21:01:40 +00002069 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002070
2071
showardd3dc1992009-04-22 21:01:40 +00002072 def epilog(self):
2073 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002074 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002075 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002076
showard9bb960b2009-11-19 01:02:11 +00002077
2078 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002079 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002080 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002081 models.HostQueueEntry.Status.COMPLETED)
2082 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2083 else:
2084 final_success = False
2085 num_tests_failed = 0
2086
showard9bb960b2009-11-19 01:02:11 +00002087 reboot_after = self._job.reboot_after
2088 do_reboot = (
2089 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002090 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002091 or reboot_after == model_attributes.RebootAfter.ALWAYS
2092 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
showard9bb960b2009-11-19 01:02:11 +00002093 and final_success and num_tests_failed == 0))
2094
showardd1195652009-12-08 22:21:02 +00002095 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002096 if do_reboot:
2097 # don't pass the queue entry to the CleanupTask. if the cleanup
2098 # fails, the job doesn't care -- it's over.
2099 models.SpecialTask.objects.create(
2100 host=models.Host.objects.get(id=queue_entry.host.id),
2101 task=models.SpecialTask.Task.CLEANUP,
2102 requested_by=self._job.owner_model())
2103 else:
2104 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002105
2106
showard0bbfc212009-04-29 21:06:13 +00002107 def run(self):
showard597bfd32009-05-08 18:22:50 +00002108 autoserv_exit_code = self._autoserv_monitor.exit_code()
2109 # only run if Autoserv exited due to some signal. if we have no exit
2110 # code, assume something bad (and signal-like) happened.
2111 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002112 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002113 else:
2114 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002115
2116
mbligh4608b002010-01-05 18:22:35 +00002117class SelfThrottledPostJobTask(PostJobTask):
2118 """
2119 Special AgentTask subclass that maintains its own global process limit.
2120 """
2121 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002122
2123
mbligh4608b002010-01-05 18:22:35 +00002124 @classmethod
2125 def _increment_running_processes(cls):
2126 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002127
mblighd5c95802008-03-05 00:33:46 +00002128
mbligh4608b002010-01-05 18:22:35 +00002129 @classmethod
2130 def _decrement_running_processes(cls):
2131 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002132
2133
mbligh4608b002010-01-05 18:22:35 +00002134 @classmethod
2135 def _max_processes(cls):
2136 raise NotImplementedError
2137
2138
2139 @classmethod
2140 def _can_run_new_process(cls):
2141 return cls._num_running_processes < cls._max_processes()
2142
2143
2144 def _process_started(self):
2145 return bool(self.monitor)
2146
2147
2148 def tick(self):
2149 # override tick to keep trying to start until the process count goes
2150 # down and we can, at which point we revert to default behavior
2151 if self._process_started():
2152 super(SelfThrottledPostJobTask, self).tick()
2153 else:
2154 self._try_starting_process()
2155
2156
2157 def run(self):
2158 # override run() to not actually run unless we can
2159 self._try_starting_process()
2160
2161
2162 def _try_starting_process(self):
2163 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002164 return
2165
mbligh4608b002010-01-05 18:22:35 +00002166 # actually run the command
2167 super(SelfThrottledPostJobTask, self).run()
jamesren25663562010-04-27 18:00:55 +00002168 if self._process_started():
2169 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002170
mblighd5c95802008-03-05 00:33:46 +00002171
mbligh4608b002010-01-05 18:22:35 +00002172 def finished(self, success):
2173 super(SelfThrottledPostJobTask, self).finished(success)
2174 if self._process_started():
2175 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002176
showard21baa452008-10-21 00:08:39 +00002177
mbligh4608b002010-01-05 18:22:35 +00002178class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002179 def __init__(self, queue_entries):
2180 super(FinalReparseTask, self).__init__(queue_entries,
2181 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002182 # don't use _set_ids, since we don't want to set the host_ids
2183 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002184
2185
2186 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002187 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002188 results_dir]
2189
2190
2191 @property
2192 def num_processes(self):
2193 return 0 # don't include parser processes in accounting
2194
2195
2196 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002197 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002198
2199
showard97aed502008-11-04 02:01:24 +00002200 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002201 def _max_processes(cls):
2202 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002203
2204
2205 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002206 self._check_queue_entry_statuses(
2207 self.queue_entries,
2208 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002209
showard97aed502008-11-04 02:01:24 +00002210 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002211
2212
2213 def epilog(self):
2214 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002215 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002216
2217
mbligh4608b002010-01-05 18:22:35 +00002218class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002219 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2220
mbligh4608b002010-01-05 18:22:35 +00002221 def __init__(self, queue_entries):
2222 super(ArchiveResultsTask, self).__init__(queue_entries,
2223 log_file_name='.archiving.log')
2224 # don't use _set_ids, since we don't want to set the host_ids
2225 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002226
2227
mbligh4608b002010-01-05 18:22:35 +00002228 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002229 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002230
2231
mbligh4608b002010-01-05 18:22:35 +00002232 def _generate_command(self, results_dir):
2233 return [_autoserv_path , '-p',
2234 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002235 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002236 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2237 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002238
2239
mbligh4608b002010-01-05 18:22:35 +00002240 @classmethod
2241 def _max_processes(cls):
2242 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002243
2244
2245 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002246 self._check_queue_entry_statuses(
2247 self.queue_entries,
2248 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2249
2250 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002251
2252
mbligh4608b002010-01-05 18:22:35 +00002253 def epilog(self):
2254 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002255 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002256 failed_file = os.path.join(self._working_directory(),
2257 self._ARCHIVING_FAILED_FILE)
2258 paired_process = self._paired_with_monitor().get_process()
2259 _drone_manager.write_lines_to_file(
2260 failed_file, ['Archiving failed with exit code %s'
2261 % self.monitor.exit_code()],
2262 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002263 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002264
2265
mbligh36768f02008-02-22 18:28:33 +00002266if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002267 main()