blob: d86ed3c3e83d6d651974e186cf097f9cdcc605f3 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
mbligh36768f02008-02-22 18:28:33 +00008
Aviv Keshet225bdfe2013-03-05 10:10:08 -08009import datetime, optparse, os, signal
10import sys, time, traceback, urllib
11import logging, gc
showard402934a2009-12-21 22:20:47 +000012
Alex Miller05d7b4c2013-03-04 07:49:38 -080013import common
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000015
16import django.db
17
showard136e6dc2009-06-10 19:38:49 +000018from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000019from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000020from autotest_lib.database import database_connection
jamesrendd855242010-03-02 22:23:44 +000021from autotest_lib.frontend.afe import model_attributes
Alex Miller05d7b4c2013-03-04 07:49:38 -080022from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000023from autotest_lib.scheduler import drone_manager, drones, email_manager
Dale Curtisaa513362011-03-01 17:27:44 -080024from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
Alex Miller05d7b4c2013-03-04 07:49:38 -080025from autotest_lib.scheduler import scheduler_logging_config
jamesrenc44ae992010-02-19 00:12:54 +000026from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080027from autotest_lib.scheduler import status_server, scheduler_config
Fang Deng1d6c2a02013-04-17 15:25:45 -070028from autotest_lib.site_utils.graphite import stats
Alex Miller05d7b4c2013-03-04 07:49:38 -080029
showard549afad2009-08-20 23:33:36 +000030BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
31PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000032
mbligh36768f02008-02-22 18:28:33 +000033RESULTS_DIR = '.'
34AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000035DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000036AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
37
38if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000039 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000040AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
41AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
42
43if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000044 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000045
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000056_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000057_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000058
Eric Lie0493a42010-11-15 13:05:43 -080059def _parser_path_default(install_dir):
60 return os.path.join(install_dir, 'tko', 'parse')
61_parser_path_func = utils.import_site_function(
62 __file__, 'autotest_lib.scheduler.site_monitor_db',
63 'parser_path', _parser_path_default)
64_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
65
mbligh36768f02008-02-22 18:28:33 +000066
showardec6a3b92009-09-25 20:29:13 +000067def _get_pidfile_timeout_secs():
68 """@returns How long to wait for autoserv to write pidfile."""
69 pidfile_timeout_mins = global_config.global_config.get_config_value(
70 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
71 return pidfile_timeout_mins * 60
72
73
mbligh83c1e9e2009-05-01 23:10:41 +000074def _site_init_monitor_db_dummy():
75 return {}
76
77
jamesren76fcf192010-04-21 20:39:50 +000078def _verify_default_drone_set_exists():
79 if (models.DroneSet.drone_sets_enabled() and
80 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080081 raise host_scheduler.SchedulerError(
82 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000083
84
85def _sanity_check():
86 """Make sure the configs are consistent before starting the scheduler"""
87 _verify_default_drone_set_exists()
88
89
mbligh36768f02008-02-22 18:28:33 +000090def main():
showard27f33872009-04-07 18:20:53 +000091 try:
showard549afad2009-08-20 23:33:36 +000092 try:
93 main_without_exception_handling()
94 except SystemExit:
95 raise
96 except:
97 logging.exception('Exception escaping in monitor_db')
98 raise
99 finally:
100 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000101
102
103def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000104 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000105
showard136e6dc2009-06-10 19:38:49 +0000106 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000107 parser = optparse.OptionParser(usage)
108 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
109 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000110 parser.add_option('--test', help='Indicate that scheduler is under ' +
111 'test and should use dummy autoserv and no parsing',
112 action='store_true')
113 (options, args) = parser.parse_args()
114 if len(args) != 1:
115 parser.print_usage()
116 return
mbligh36768f02008-02-22 18:28:33 +0000117
showard5613c662009-06-08 23:30:33 +0000118 scheduler_enabled = global_config.global_config.get_config_value(
119 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
120
121 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800122 logging.error("Scheduler not enabled, set enable_scheduler to true in "
123 "the global_config's SCHEDULER section to enable it. "
124 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000125 sys.exit(1)
126
jadmanski0afbb632008-06-06 21:10:57 +0000127 global RESULTS_DIR
128 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000129
mbligh83c1e9e2009-05-01 23:10:41 +0000130 site_init = utils.import_site_function(__file__,
131 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
132 _site_init_monitor_db_dummy)
133 site_init()
134
showardcca334f2009-03-12 20:38:34 +0000135 # Change the cwd while running to avoid issues incase we were launched from
136 # somewhere odd (such as a random NFS home directory of the person running
137 # sudo to launch us as the appropriate user).
138 os.chdir(RESULTS_DIR)
139
jamesrenc7d387e2010-08-10 21:48:30 +0000140 # This is helpful for debugging why stuff a scheduler launches is
141 # misbehaving.
142 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000143
jadmanski0afbb632008-06-06 21:10:57 +0000144 if options.test:
145 global _autoserv_path
146 _autoserv_path = 'autoserv_dummy'
147 global _testing_mode
148 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000149
jamesrenc44ae992010-02-19 00:12:54 +0000150 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000151 server.start()
152
jadmanski0afbb632008-06-06 21:10:57 +0000153 try:
jamesrenc44ae992010-02-19 00:12:54 +0000154 initialize()
showardc5afc462009-01-13 00:09:39 +0000155 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000156 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000157
Eric Lia82dc352011-02-23 13:15:52 -0800158 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000159 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000160 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000161 except:
showard170873e2009-01-07 00:22:26 +0000162 email_manager.manager.log_stacktrace(
163 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000164
showard170873e2009-01-07 00:22:26 +0000165 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000166 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000167 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000168 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000169
170
showard136e6dc2009-06-10 19:38:49 +0000171def setup_logging():
172 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
173 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
174 logging_manager.configure_logging(
175 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
176 logfile_name=log_name)
177
178
mbligh36768f02008-02-22 18:28:33 +0000179def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000180 global _shutdown
181 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000182 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000183
184
jamesrenc44ae992010-02-19 00:12:54 +0000185def initialize():
showardb18134f2009-03-20 20:52:18 +0000186 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
187 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000188
showard8de37132009-08-31 18:33:08 +0000189 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000190 logging.critical("monitor_db already running, aborting!")
191 sys.exit(1)
192 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000193
showardb1e51872008-10-07 11:08:18 +0000194 if _testing_mode:
195 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000196 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000197
jadmanski0afbb632008-06-06 21:10:57 +0000198 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
199 global _db
showard170873e2009-01-07 00:22:26 +0000200 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000201 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000202
showardfa8629c2008-11-04 16:51:23 +0000203 # ensure Django connection is in autocommit
204 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000205 # bypass the readonly connection
206 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000207
showardb18134f2009-03-20 20:52:18 +0000208 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000209 signal.signal(signal.SIGINT, handle_sigint)
210
jamesrenc44ae992010-02-19 00:12:54 +0000211 initialize_globals()
212 scheduler_models.initialize()
213
showardd1ee1dd2009-01-07 21:33:08 +0000214 drones = global_config.global_config.get_config_value(
215 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
216 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000217 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000218 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000219 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
220
showardb18134f2009-03-20 20:52:18 +0000221 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000222
223
jamesrenc44ae992010-02-19 00:12:54 +0000224def initialize_globals():
225 global _drone_manager
226 _drone_manager = drone_manager.instance()
227
228
showarded2afea2009-07-07 20:54:07 +0000229def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
230 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000231 """
232 @returns The autoserv command line as a list of executable + parameters.
233
234 @param machines - string - A machine or comma separated list of machines
235 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000236 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet3664d072013-03-04 16:22:55 -0800237 @param job - Job object - If supplied, -u owner, -l name, and --test-retry
238 parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000239 @param queue_entry - A HostQueueEntry object - If supplied and no Job
240 object was supplied, this will be used to lookup the Job object.
241 """
showarda9545c02009-12-18 22:44:26 +0000242 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000243 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000244 if machines:
245 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000246 if job or queue_entry:
247 if not job:
248 job = queue_entry.job
249 autoserv_argv += ['-u', job.owner, '-l', job.name]
Aviv Keshet3664d072013-03-04 16:22:55 -0800250 if job.test_retry:
251 autoserv_argv += ['--test-retry='+str(job.test_retry)]
showarde9c69362009-06-30 01:58:03 +0000252 if verbose:
253 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000254 return autoserv_argv + extra_args
255
256
Simran Basia858a232012-08-21 11:04:37 -0700257class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800258
259
jadmanski0afbb632008-06-06 21:10:57 +0000260 def __init__(self):
261 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000262 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800263 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000264 user_cleanup_time = scheduler_config.config.clean_interval
265 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
266 _db, user_cleanup_time)
267 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000268 self._host_agents = {}
269 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000270 self._tick_count = 0
271 self._last_garbage_stats_time = time.time()
272 self._seconds_between_garbage_stats = 60 * (
273 global_config.global_config.get_config_value(
274 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700275 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700276 self._tick_debug = global_config.global_config.get_config_value(
277 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
278 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700279 self._extra_debugging = global_config.global_config.get_config_value(
280 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
281 default=False)
mbligh36768f02008-02-22 18:28:33 +0000282
mbligh36768f02008-02-22 18:28:33 +0000283
showard915958d2009-04-22 21:00:58 +0000284 def initialize(self, recover_hosts=True):
285 self._periodic_cleanup.initialize()
286 self._24hr_upkeep.initialize()
287
jadmanski0afbb632008-06-06 21:10:57 +0000288 # always recover processes
289 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000290
jadmanski0afbb632008-06-06 21:10:57 +0000291 if recover_hosts:
292 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000293
jamesrenc44ae992010-02-19 00:12:54 +0000294 self._host_scheduler.recovery_on_startup()
295
mbligh36768f02008-02-22 18:28:33 +0000296
Simran Basi0ec94dd2012-08-28 09:50:10 -0700297 def _log_tick_msg(self, msg):
298 if self._tick_debug:
299 logging.debug(msg)
300
301
Simran Basidef92872012-09-20 13:34:34 -0700302 def _log_extra_msg(self, msg):
303 if self._extra_debugging:
304 logging.debug(msg)
305
306
jadmanski0afbb632008-06-06 21:10:57 +0000307 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700308 """
309 This is an altered version of tick() where we keep track of when each
310 major step begins so we can try to figure out where we are using most
311 of the tick time.
312 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700313 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700314 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000315 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700316 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000317 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700318 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000319 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700320 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000321 self._find_aborting()
Simran Basi3f6717d2012-09-13 15:21:22 -0700322 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000323 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700324 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000325 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700326 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000327 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700328 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000329 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700330 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000331 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700332 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000333 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700334 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000335 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700336 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000337 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700338 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700339 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700340 with timer.get_client('email_manager_send_queued_emails'):
341 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700342 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700343 with timer.get_client('django_db_reset_queries'):
344 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000345 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000346
showard97aed502008-11-04 02:01:24 +0000347
mblighf3294cc2009-04-08 21:17:38 +0000348 def _run_cleanup(self):
349 self._periodic_cleanup.run_cleanup_maybe()
350 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000351
mbligh36768f02008-02-22 18:28:33 +0000352
showardf13a9e22009-12-18 22:54:09 +0000353 def _garbage_collection(self):
354 threshold_time = time.time() - self._seconds_between_garbage_stats
355 if threshold_time < self._last_garbage_stats_time:
356 # Don't generate these reports very often.
357 return
358
359 self._last_garbage_stats_time = time.time()
360 # Force a full level 0 collection (because we can, it doesn't hurt
361 # at this interval).
362 gc.collect()
363 logging.info('Logging garbage collector stats on tick %d.',
364 self._tick_count)
365 gc_stats._log_garbage_collector_stats()
366
367
showard170873e2009-01-07 00:22:26 +0000368 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
369 for object_id in object_ids:
370 agent_dict.setdefault(object_id, set()).add(agent)
371
372
373 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
374 for object_id in object_ids:
375 assert object_id in agent_dict
376 agent_dict[object_id].remove(agent)
377
378
showardd1195652009-12-08 22:21:02 +0000379 def add_agent_task(self, agent_task):
380 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000381 self._agents.append(agent)
382 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000383 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
384 self._register_agent_for_ids(self._queue_entry_agents,
385 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000386
showard170873e2009-01-07 00:22:26 +0000387
388 def get_agents_for_entry(self, queue_entry):
389 """
390 Find agents corresponding to the specified queue_entry.
391 """
showardd3dc1992009-04-22 21:01:40 +0000392 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000393
394
395 def host_has_agent(self, host):
396 """
397 Determine if there is currently an Agent present using this host.
398 """
399 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000400
401
jadmanski0afbb632008-06-06 21:10:57 +0000402 def remove_agent(self, agent):
403 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000404 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
405 agent)
406 self._unregister_agent_for_ids(self._queue_entry_agents,
407 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000408
409
showard8cc058f2009-09-08 16:26:33 +0000410 def _host_has_scheduled_special_task(self, host):
411 return bool(models.SpecialTask.objects.filter(host__id=host.id,
412 is_active=False,
413 is_complete=False))
414
415
jadmanski0afbb632008-06-06 21:10:57 +0000416 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000417 agent_tasks = self._create_recovery_agent_tasks()
418 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000419 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000420 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000421 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000422 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000423 self._reverify_remaining_hosts()
424 # reinitialize drones after killing orphaned processes, since they can
425 # leave around files when they die
426 _drone_manager.execute_actions()
427 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000428
showard170873e2009-01-07 00:22:26 +0000429
showardd1195652009-12-08 22:21:02 +0000430 def _create_recovery_agent_tasks(self):
431 return (self._get_queue_entry_agent_tasks()
432 + self._get_special_task_agent_tasks(is_active=True))
433
434
435 def _get_queue_entry_agent_tasks(self):
436 # host queue entry statuses handled directly by AgentTasks (Verifying is
437 # handled through SpecialTasks, so is not listed here)
438 statuses = (models.HostQueueEntry.Status.STARTING,
439 models.HostQueueEntry.Status.RUNNING,
440 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000441 models.HostQueueEntry.Status.PARSING,
442 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000443 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000444 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000445 where='status IN (%s)' % status_list)
446
447 agent_tasks = []
448 used_queue_entries = set()
449 for entry in queue_entries:
450 if self.get_agents_for_entry(entry):
451 # already being handled
452 continue
453 if entry in used_queue_entries:
454 # already picked up by a synchronous job
455 continue
456 agent_task = self._get_agent_task_for_queue_entry(entry)
457 agent_tasks.append(agent_task)
458 used_queue_entries.update(agent_task.queue_entries)
459 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000460
461
showardd1195652009-12-08 22:21:02 +0000462 def _get_special_task_agent_tasks(self, is_active=False):
463 special_tasks = models.SpecialTask.objects.filter(
464 is_active=is_active, is_complete=False)
465 return [self._get_agent_task_for_special_task(task)
466 for task in special_tasks]
467
468
469 def _get_agent_task_for_queue_entry(self, queue_entry):
470 """
471 Construct an AgentTask instance for the given active HostQueueEntry,
472 if one can currently run it.
473 @param queue_entry: a HostQueueEntry
474 @returns an AgentTask to run the queue entry
475 """
476 task_entries = queue_entry.job.get_group_entries(queue_entry)
477 self._check_for_duplicate_host_entries(task_entries)
478
479 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
480 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000481 if queue_entry.is_hostless():
482 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000483 return QueueTask(queue_entries=task_entries)
484 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
485 return GatherLogsTask(queue_entries=task_entries)
486 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
487 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000488 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
489 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000490
Dale Curtisaa513362011-03-01 17:27:44 -0800491 raise host_scheduler.SchedulerError(
492 '_get_agent_task_for_queue_entry got entry with '
493 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000494
495
496 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000497 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
498 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000499 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000500 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000501 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000502 if using_host:
showardd1195652009-12-08 22:21:02 +0000503 self._assert_host_has_no_agent(task_entry)
504
505
506 def _assert_host_has_no_agent(self, entry):
507 """
508 @param entry: a HostQueueEntry or a SpecialTask
509 """
510 if self.host_has_agent(entry.host):
511 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800512 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000513 'While scheduling %s, host %s already has a host agent %s'
514 % (entry, entry.host, agent.task))
515
516
517 def _get_agent_task_for_special_task(self, special_task):
518 """
519 Construct an AgentTask class to run the given SpecialTask and add it
520 to this dispatcher.
521 @param special_task: a models.SpecialTask instance
522 @returns an AgentTask to run this SpecialTask
523 """
524 self._assert_host_has_no_agent(special_task)
525
526 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
527 for agent_task_class in special_agent_task_classes:
528 if agent_task_class.TASK_TYPE == special_task.task:
529 return agent_task_class(task=special_task)
530
Dale Curtisaa513362011-03-01 17:27:44 -0800531 raise host_scheduler.SchedulerError(
532 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000533
534
535 def _register_pidfiles(self, agent_tasks):
536 for agent_task in agent_tasks:
537 agent_task.register_necessary_pidfiles()
538
539
540 def _recover_tasks(self, agent_tasks):
541 orphans = _drone_manager.get_orphaned_autoserv_processes()
542
543 for agent_task in agent_tasks:
544 agent_task.recover()
545 if agent_task.monitor and agent_task.monitor.has_process():
546 orphans.discard(agent_task.monitor.get_process())
547 self.add_agent_task(agent_task)
548
549 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000550
551
showard8cc058f2009-09-08 16:26:33 +0000552 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000553 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
554 % status):
showard0db3d432009-10-12 20:29:15 +0000555 if entry.status == status and not self.get_agents_for_entry(entry):
556 # The status can change during iteration, e.g., if job.run()
557 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000558 yield entry
559
560
showard6878e8b2009-07-20 22:37:45 +0000561 def _check_for_remaining_orphan_processes(self, orphans):
562 if not orphans:
563 return
564 subject = 'Unrecovered orphan autoserv processes remain'
565 message = '\n'.join(str(process) for process in orphans)
566 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000567
568 die_on_orphans = global_config.global_config.get_config_value(
569 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
570
571 if die_on_orphans:
572 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000573
showard170873e2009-01-07 00:22:26 +0000574
showard8cc058f2009-09-08 16:26:33 +0000575 def _recover_pending_entries(self):
576 for entry in self._get_unassigned_entries(
577 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000578 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000579 entry.on_pending()
580
581
showardb8900452009-10-12 20:31:01 +0000582 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000583 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000584 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
585 unrecovered_hqes = []
586 for queue_entry in queue_entries:
587 special_tasks = models.SpecialTask.objects.filter(
588 task__in=(models.SpecialTask.Task.CLEANUP,
589 models.SpecialTask.Task.VERIFY),
590 queue_entry__id=queue_entry.id,
591 is_complete=False)
592 if special_tasks.count() == 0:
593 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000594
showardb8900452009-10-12 20:31:01 +0000595 if unrecovered_hqes:
596 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800597 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000598 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000599 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000600
601
showard65db3932009-10-28 19:54:35 +0000602 def _get_prioritized_special_tasks(self):
603 """
604 Returns all queued SpecialTasks prioritized for repair first, then
605 cleanup, then verify.
606 """
607 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
608 is_complete=False,
609 host__locked=False)
610 # exclude hosts with active queue entries unless the SpecialTask is for
611 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000612 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000613 queued_tasks, 'afe_host_queue_entries', 'host_id',
614 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000615 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000616 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000617 where=['(afe_host_queue_entries.id IS NULL OR '
618 'afe_host_queue_entries.id = '
619 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000620
showard65db3932009-10-28 19:54:35 +0000621 # reorder tasks by priority
622 task_priority_order = [models.SpecialTask.Task.REPAIR,
623 models.SpecialTask.Task.CLEANUP,
624 models.SpecialTask.Task.VERIFY]
625 def task_priority_key(task):
626 return task_priority_order.index(task.task)
627 return sorted(queued_tasks, key=task_priority_key)
628
629
showard65db3932009-10-28 19:54:35 +0000630 def _schedule_special_tasks(self):
631 """
632 Execute queued SpecialTasks that are ready to run on idle hosts.
633 """
634 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000635 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000636 continue
showardd1195652009-12-08 22:21:02 +0000637 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000638
639
showard170873e2009-01-07 00:22:26 +0000640 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000641 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000642 # should never happen
showarded2afea2009-07-07 20:54:07 +0000643 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000644 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000645 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000646 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000647 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000648
649
jadmanski0afbb632008-06-06 21:10:57 +0000650 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000651 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700652 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000653 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000654 if self.host_has_agent(host):
655 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000656 continue
showard8cc058f2009-09-08 16:26:33 +0000657 if self._host_has_scheduled_special_task(host):
658 # host will have a special task scheduled on the next cycle
659 continue
showard170873e2009-01-07 00:22:26 +0000660 if print_message:
showardb18134f2009-03-20 20:52:18 +0000661 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000662 models.SpecialTask.objects.create(
663 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000664 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000665
666
jadmanski0afbb632008-06-06 21:10:57 +0000667 def _recover_hosts(self):
668 # recover "Repair Failed" hosts
669 message = 'Reverifying dead host %s'
670 self._reverify_hosts_where("status = 'Repair Failed'",
671 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000672
673
showard04c82c52008-05-29 19:38:12 +0000674
showardb95b1bd2008-08-15 18:11:04 +0000675 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000676 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +0000677 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000678 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000679 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +0000680 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000681
682
showard89f84db2009-03-12 20:39:13 +0000683 def _refresh_pending_queue_entries(self):
684 """
685 Lookup the pending HostQueueEntries and call our HostScheduler
686 refresh() method given that list. Return the list.
687
688 @returns A list of pending HostQueueEntries sorted in priority order.
689 """
showard63a34772008-08-18 19:32:50 +0000690 queue_entries = self._get_pending_queue_entries()
691 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000692 return []
showardb95b1bd2008-08-15 18:11:04 +0000693
showard63a34772008-08-18 19:32:50 +0000694 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000695
showard89f84db2009-03-12 20:39:13 +0000696 return queue_entries
697
698
699 def _schedule_atomic_group(self, queue_entry):
700 """
701 Schedule the given queue_entry on an atomic group of hosts.
702
703 Returns immediately if there are insufficient available hosts.
704
705 Creates new HostQueueEntries based off of queue_entry for the
706 scheduled hosts and starts them all running.
707 """
708 # This is a virtual host queue entry representing an entire
709 # atomic group, find a group and schedule their hosts.
710 group_hosts = self._host_scheduler.find_eligible_atomic_group(
711 queue_entry)
712 if not group_hosts:
713 return
showardcbe6f942009-06-17 19:33:49 +0000714
715 logging.info('Expanding atomic group entry %s with hosts %s',
716 queue_entry,
717 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000718
showard89f84db2009-03-12 20:39:13 +0000719 for assigned_host in group_hosts[1:]:
720 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000721 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000722 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000723 new_hqe.set_host(assigned_host)
724 self._run_queue_entry(new_hqe)
725
726 # The first assigned host uses the original HostQueueEntry
727 queue_entry.set_host(group_hosts[0])
728 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000729
730
showarda9545c02009-12-18 22:44:26 +0000731 def _schedule_hostless_job(self, queue_entry):
732 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000733 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000734
735
showard89f84db2009-03-12 20:39:13 +0000736 def _schedule_new_jobs(self):
737 queue_entries = self._refresh_pending_queue_entries()
738 if not queue_entries:
739 return
740
Simran Basi3f6717d2012-09-13 15:21:22 -0700741 logging.debug('Processing %d queue_entries', len(queue_entries))
showard63a34772008-08-18 19:32:50 +0000742 for queue_entry in queue_entries:
Simran Basidef92872012-09-20 13:34:34 -0700743 self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
showarde55955f2009-10-07 20:48:58 +0000744 is_unassigned_atomic_group = (
745 queue_entry.atomic_group_id is not None
746 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +0000747
748 if queue_entry.is_hostless():
Simran Basidef92872012-09-20 13:34:34 -0700749 self._log_extra_msg('Scheduling hostless job.')
showarda9545c02009-12-18 22:44:26 +0000750 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000751 elif is_unassigned_atomic_group:
752 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +0000753 else:
jamesren883492a2010-02-12 00:45:18 +0000754 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +0000755 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +0000756 assert assigned_host.id == queue_entry.host_id
757 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000758
759
showard8cc058f2009-09-08 16:26:33 +0000760 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +0000761 for agent_task in self._get_queue_entry_agent_tasks():
762 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000763
764
765 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000766 for entry in scheduler_models.HostQueueEntry.fetch(
767 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000768 task = entry.job.schedule_delayed_callback_task(entry)
769 if task:
showardd1195652009-12-08 22:21:02 +0000770 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000771
772
jamesren883492a2010-02-12 00:45:18 +0000773 def _run_queue_entry(self, queue_entry):
Simran Basidef92872012-09-20 13:34:34 -0700774 self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
775 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000776 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000777
778
jadmanski0afbb632008-06-06 21:10:57 +0000779 def _find_aborting(self):
jamesrene7c65cb2010-06-08 20:38:10 +0000780 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000781 for entry in scheduler_models.HostQueueEntry.fetch(
782 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +0000783 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +0000784 for agent in self.get_agents_for_entry(entry):
785 agent.abort()
786 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000787 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700788 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000789 for job in jobs_to_stop:
790 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000791
792
showard324bf812009-01-20 23:23:38 +0000793 def _can_start_agent(self, agent, num_started_this_cycle,
794 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000795 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000796 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000797 return True
798 # don't allow any nonzero-process agents to run after we've reached a
799 # limit (this avoids starvation of many-process agents)
800 if have_reached_limit:
801 return False
802 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000803 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000804 agent.task.owner_username,
805 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000806 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000807 return False
808 # if a single agent exceeds the per-cycle throttling, still allow it to
809 # run when it's the first agent in the cycle
810 if num_started_this_cycle == 0:
811 return True
812 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000813 if (num_started_this_cycle + agent.task.num_processes >
814 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000815 return False
816 return True
817
818
jadmanski0afbb632008-06-06 21:10:57 +0000819 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000820 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000821 have_reached_limit = False
822 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700823 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000824 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700825 self._log_extra_msg('Processing Agent with Host Ids: %s and '
826 'queue_entry ids:%s' % (agent.host_ids,
827 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000828 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000829 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000830 have_reached_limit):
831 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700832 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000833 continue
showardd1195652009-12-08 22:21:02 +0000834 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700835 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000836 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700837 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000838 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700839 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000840 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700841 logging.info('%d running processes. %d added this cycle.',
842 _drone_manager.total_running_processes(),
843 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +0000844
845
showard29f7cd22009-04-29 21:16:24 +0000846 def _process_recurring_runs(self):
847 recurring_runs = models.RecurringRun.objects.filter(
848 start_date__lte=datetime.datetime.now())
849 for rrun in recurring_runs:
850 # Create job from template
851 job = rrun.job
852 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000853 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000854
855 host_objects = info['hosts']
856 one_time_hosts = info['one_time_hosts']
857 metahost_objects = info['meta_hosts']
858 dependencies = info['dependencies']
859 atomic_group = info['atomic_group']
860
861 for host in one_time_hosts or []:
862 this_host = models.Host.create_one_time_host(host.hostname)
863 host_objects.append(this_host)
864
865 try:
866 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000867 options=options,
showard29f7cd22009-04-29 21:16:24 +0000868 host_objects=host_objects,
869 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +0000870 atomic_group=atomic_group)
871
872 except Exception, ex:
873 logging.exception(ex)
874 #TODO send email
875
876 if rrun.loop_count == 1:
877 rrun.delete()
878 else:
879 if rrun.loop_count != 0: # if not infinite loop
880 # calculate new start_date
881 difference = datetime.timedelta(seconds=rrun.loop_period)
882 rrun.start_date = rrun.start_date + difference
883 rrun.loop_count -= 1
884 rrun.save()
885
886
Simran Basia858a232012-08-21 11:04:37 -0700887SiteDispatcher = utils.import_site_class(
888 __file__, 'autotest_lib.scheduler.site_monitor_db',
889 'SiteDispatcher', BaseDispatcher)
890
891class Dispatcher(SiteDispatcher):
892 pass
893
894
showard170873e2009-01-07 00:22:26 +0000895class PidfileRunMonitor(object):
896 """
897 Client must call either run() to start a new process or
898 attach_to_existing_process().
899 """
mbligh36768f02008-02-22 18:28:33 +0000900
showard170873e2009-01-07 00:22:26 +0000901 class _PidfileException(Exception):
902 """
903 Raised when there's some unexpected behavior with the pid file, but only
904 used internally (never allowed to escape this class).
905 """
mbligh36768f02008-02-22 18:28:33 +0000906
907
showard170873e2009-01-07 00:22:26 +0000908 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000909 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000910 self._start_time = None
911 self.pidfile_id = None
912 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000913
914
showard170873e2009-01-07 00:22:26 +0000915 def _add_nice_command(self, command, nice_level):
916 if not nice_level:
917 return command
918 return ['nice', '-n', str(nice_level)] + command
919
920
921 def _set_start_time(self):
922 self._start_time = time.time()
923
924
showard418785b2009-11-23 20:19:59 +0000925 def run(self, command, working_directory, num_processes, nice_level=None,
926 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +0000927 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +0000928 assert command is not None
929 if nice_level is not None:
930 command = ['nice', '-n', str(nice_level)] + command
931 self._set_start_time()
932 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +0000933 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +0000934 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +0000935 paired_with_pidfile=paired_with_pidfile, username=username,
936 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +0000937
938
showarded2afea2009-07-07 20:54:07 +0000939 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +0000940 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +0000941 num_processes=None):
showard170873e2009-01-07 00:22:26 +0000942 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +0000943 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000944 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +0000945 if num_processes is not None:
946 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +0000947
948
jadmanski0afbb632008-06-06 21:10:57 +0000949 def kill(self):
showard170873e2009-01-07 00:22:26 +0000950 if self.has_process():
951 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000952
mbligh36768f02008-02-22 18:28:33 +0000953
showard170873e2009-01-07 00:22:26 +0000954 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000955 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000956 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000957
958
showard170873e2009-01-07 00:22:26 +0000959 def get_process(self):
showard21baa452008-10-21 00:08:39 +0000960 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +0000961 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +0000962 return self._state.process
mblighbb421852008-03-11 22:36:16 +0000963
964
showard170873e2009-01-07 00:22:26 +0000965 def _read_pidfile(self, use_second_read=False):
966 assert self.pidfile_id is not None, (
967 'You must call run() or attach_to_existing_process()')
968 contents = _drone_manager.get_pidfile_contents(
969 self.pidfile_id, use_second_read=use_second_read)
970 if contents.is_invalid():
971 self._state = drone_manager.PidfileContents()
972 raise self._PidfileException(contents)
973 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +0000974
975
showard21baa452008-10-21 00:08:39 +0000976 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +0000977 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
978 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +0000979 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +0000980 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +0000981
982
983 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +0000984 if self.lost_process:
showard21baa452008-10-21 00:08:39 +0000985 return
mblighbb421852008-03-11 22:36:16 +0000986
showard21baa452008-10-21 00:08:39 +0000987 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +0000988
showard170873e2009-01-07 00:22:26 +0000989 if self._state.process is None:
990 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +0000991 return
mbligh90a549d2008-03-25 23:52:34 +0000992
showard21baa452008-10-21 00:08:39 +0000993 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000994 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +0000995 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +0000996 return
mbligh90a549d2008-03-25 23:52:34 +0000997
showard170873e2009-01-07 00:22:26 +0000998 # pid but no running process - maybe process *just* exited
999 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001000 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001001 # autoserv exited without writing an exit code
1002 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001003 self._handle_pidfile_error(
1004 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001005
showard21baa452008-10-21 00:08:39 +00001006
1007 def _get_pidfile_info(self):
1008 """\
1009 After completion, self._state will contain:
1010 pid=None, exit_status=None if autoserv has not yet run
1011 pid!=None, exit_status=None if autoserv is running
1012 pid!=None, exit_status!=None if autoserv has completed
1013 """
1014 try:
1015 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001016 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001017 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001018
1019
showard170873e2009-01-07 00:22:26 +00001020 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001021 """\
1022 Called when no pidfile is found or no pid is in the pidfile.
1023 """
showard170873e2009-01-07 00:22:26 +00001024 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001025 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001026 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001027 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001028 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001029
1030
showard35162b02009-03-03 02:17:30 +00001031 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001032 """\
1033 Called when autoserv has exited without writing an exit status,
1034 or we've timed out waiting for autoserv to write a pid to the
1035 pidfile. In either case, we just return failure and the caller
1036 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001037
showard170873e2009-01-07 00:22:26 +00001038 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001039 """
1040 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001041 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001042 self._state.exit_status = 1
1043 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001044
1045
jadmanski0afbb632008-06-06 21:10:57 +00001046 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001047 self._get_pidfile_info()
1048 return self._state.exit_status
1049
1050
1051 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001052 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001053 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001054 if self._state.num_tests_failed is None:
1055 return -1
showard21baa452008-10-21 00:08:39 +00001056 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001057
1058
showardcdaeae82009-08-31 18:32:48 +00001059 def try_copy_results_on_drone(self, **kwargs):
1060 if self.has_process():
1061 # copy results logs into the normal place for job results
1062 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1063
1064
1065 def try_copy_to_results_repository(self, source, **kwargs):
1066 if self.has_process():
1067 _drone_manager.copy_to_results_repository(self.get_process(),
1068 source, **kwargs)
1069
1070
mbligh36768f02008-02-22 18:28:33 +00001071class Agent(object):
showard77182562009-06-10 00:16:05 +00001072 """
showard8cc058f2009-09-08 16:26:33 +00001073 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001074
1075 The following methods are required on all task objects:
1076 poll() - Called periodically to let the task check its status and
1077 update its internal state. If the task succeeded.
1078 is_done() - Returns True if the task is finished.
1079 abort() - Called when an abort has been requested. The task must
1080 set its aborted attribute to True if it actually aborted.
1081
1082 The following attributes are required on all task objects:
1083 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001084 success - bool, True if this task succeeded.
1085 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1086 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001087 """
1088
1089
showard418785b2009-11-23 20:19:59 +00001090 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001091 """
showard8cc058f2009-09-08 16:26:33 +00001092 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001093 """
showard8cc058f2009-09-08 16:26:33 +00001094 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001095
showard77182562009-06-10 00:16:05 +00001096 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001097 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001098
showard8cc058f2009-09-08 16:26:33 +00001099 self.queue_entry_ids = task.queue_entry_ids
1100 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001101
showard8cc058f2009-09-08 16:26:33 +00001102 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001103 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001104
1105
jadmanski0afbb632008-06-06 21:10:57 +00001106 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001107 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001108 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001109 self.task.poll()
1110 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001111 self.finished = True
showardec113162008-05-08 00:52:49 +00001112
1113
jadmanski0afbb632008-06-06 21:10:57 +00001114 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001115 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001116
1117
showardd3dc1992009-04-22 21:01:40 +00001118 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001119 if self.task:
1120 self.task.abort()
1121 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001122 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001123 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001124
showardd3dc1992009-04-22 21:01:40 +00001125
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001126class BaseAgentTask(object):
showardd1195652009-12-08 22:21:02 +00001127 class _NullMonitor(object):
1128 pidfile_id = None
1129
1130 def has_process(self):
1131 return True
1132
1133
1134 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001135 """
showardd1195652009-12-08 22:21:02 +00001136 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001137 """
jadmanski0afbb632008-06-06 21:10:57 +00001138 self.done = False
showardd1195652009-12-08 22:21:02 +00001139 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001140 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001141 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001142 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001143 self.queue_entry_ids = []
1144 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001145 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001146
1147
1148 def _set_ids(self, host=None, queue_entries=None):
1149 if queue_entries and queue_entries != [None]:
1150 self.host_ids = [entry.host.id for entry in queue_entries]
1151 self.queue_entry_ids = [entry.id for entry in queue_entries]
1152 else:
1153 assert host
1154 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001155
1156
jadmanski0afbb632008-06-06 21:10:57 +00001157 def poll(self):
showard08a36412009-05-05 01:01:13 +00001158 if not self.started:
1159 self.start()
showardd1195652009-12-08 22:21:02 +00001160 if not self.done:
1161 self.tick()
showard08a36412009-05-05 01:01:13 +00001162
1163
1164 def tick(self):
showardd1195652009-12-08 22:21:02 +00001165 assert self.monitor
1166 exit_code = self.monitor.exit_code()
1167 if exit_code is None:
1168 return
mbligh36768f02008-02-22 18:28:33 +00001169
showardd1195652009-12-08 22:21:02 +00001170 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001171 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001172
1173
jadmanski0afbb632008-06-06 21:10:57 +00001174 def is_done(self):
1175 return self.done
mbligh36768f02008-02-22 18:28:33 +00001176
1177
jadmanski0afbb632008-06-06 21:10:57 +00001178 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001179 if self.done:
showardd1195652009-12-08 22:21:02 +00001180 assert self.started
showard08a36412009-05-05 01:01:13 +00001181 return
showardd1195652009-12-08 22:21:02 +00001182 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001183 self.done = True
1184 self.success = success
1185 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001186
1187
jadmanski0afbb632008-06-06 21:10:57 +00001188 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001189 """
1190 To be overridden.
1191 """
showarded2afea2009-07-07 20:54:07 +00001192 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001193 self.register_necessary_pidfiles()
1194
1195
1196 def _log_file(self):
1197 if not self._log_file_name:
1198 return None
1199 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001200
mbligh36768f02008-02-22 18:28:33 +00001201
jadmanski0afbb632008-06-06 21:10:57 +00001202 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001203 log_file = self._log_file()
1204 if self.monitor and log_file:
1205 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001206
1207
jadmanski0afbb632008-06-06 21:10:57 +00001208 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001209 """
1210 To be overridden.
1211 """
jadmanski0afbb632008-06-06 21:10:57 +00001212 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001213 logging.info("%s finished with success=%s", type(self).__name__,
1214 self.success)
1215
mbligh36768f02008-02-22 18:28:33 +00001216
1217
jadmanski0afbb632008-06-06 21:10:57 +00001218 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001219 if not self.started:
1220 self.prolog()
1221 self.run()
1222
1223 self.started = True
1224
1225
1226 def abort(self):
1227 if self.monitor:
1228 self.monitor.kill()
1229 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001230 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001231 self.cleanup()
1232
1233
showarded2afea2009-07-07 20:54:07 +00001234 def _get_consistent_execution_path(self, execution_entries):
1235 first_execution_path = execution_entries[0].execution_path()
1236 for execution_entry in execution_entries[1:]:
1237 assert execution_entry.execution_path() == first_execution_path, (
1238 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1239 execution_entry,
1240 first_execution_path,
1241 execution_entries[0]))
1242 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001243
1244
showarded2afea2009-07-07 20:54:07 +00001245 def _copy_results(self, execution_entries, use_monitor=None):
1246 """
1247 @param execution_entries: list of objects with execution_path() method
1248 """
showard6d1c1432009-08-20 23:30:39 +00001249 if use_monitor is not None and not use_monitor.has_process():
1250 return
1251
showarded2afea2009-07-07 20:54:07 +00001252 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001253 if use_monitor is None:
1254 assert self.monitor
1255 use_monitor = self.monitor
1256 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001257 execution_path = self._get_consistent_execution_path(execution_entries)
1258 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001259 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001260
showarda1e74b32009-05-12 17:32:04 +00001261
1262 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001263 for queue_entry in queue_entries:
1264 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001265
1266
mbligh4608b002010-01-05 18:22:35 +00001267 def _archive_results(self, queue_entries):
1268 for queue_entry in queue_entries:
1269 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001270
1271
showardd1195652009-12-08 22:21:02 +00001272 def _command_line(self):
1273 """
1274 Return the command line to run. Must be overridden.
1275 """
1276 raise NotImplementedError
1277
1278
1279 @property
1280 def num_processes(self):
1281 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001282 Return the number of processes forked by this BaseAgentTask's process.
1283 It may only be approximate. To be overridden if necessary.
showardd1195652009-12-08 22:21:02 +00001284 """
1285 return 1
1286
1287
1288 def _paired_with_monitor(self):
1289 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001290 If this BaseAgentTask's process must run on the same machine as some
showardd1195652009-12-08 22:21:02 +00001291 previous process, this method should be overridden to return a
1292 PidfileRunMonitor for that process.
1293 """
1294 return self._NullMonitor()
1295
1296
1297 @property
1298 def owner_username(self):
1299 """
1300 Return login of user responsible for this task. May be None. Must be
1301 overridden.
1302 """
1303 raise NotImplementedError
1304
1305
1306 def _working_directory(self):
1307 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001308 Return the directory where this BaseAgentTask's process executes.
1309 Must be overridden.
showardd1195652009-12-08 22:21:02 +00001310 """
1311 raise NotImplementedError
1312
1313
1314 def _pidfile_name(self):
1315 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001316 Return the name of the pidfile this BaseAgentTask's process uses. To be
showardd1195652009-12-08 22:21:02 +00001317 overridden if necessary.
1318 """
jamesrenc44ae992010-02-19 00:12:54 +00001319 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001320
1321
1322 def _check_paired_results_exist(self):
1323 if not self._paired_with_monitor().has_process():
1324 email_manager.manager.enqueue_notify_email(
1325 'No paired results in task',
1326 'No paired results in task %s at %s'
1327 % (self, self._paired_with_monitor().pidfile_id))
1328 self.finished(False)
1329 return False
1330 return True
1331
1332
1333 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001334 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001335 self.monitor = PidfileRunMonitor()
1336
1337
1338 def run(self):
1339 if not self._check_paired_results_exist():
1340 return
1341
1342 self._create_monitor()
1343 self.monitor.run(
1344 self._command_line(), self._working_directory(),
1345 num_processes=self.num_processes,
1346 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1347 pidfile_name=self._pidfile_name(),
1348 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001349 username=self.owner_username,
1350 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1351
1352
1353 def get_drone_hostnames_allowed(self):
1354 if not models.DroneSet.drone_sets_enabled():
1355 return None
1356
1357 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1358 if not hqes:
1359 # Only special tasks could be missing host queue entries
1360 assert isinstance(self, SpecialAgentTask)
1361 return self._user_or_global_default_drone_set(
1362 self.task, self.task.requested_by)
1363
1364 job_ids = hqes.values_list('job', flat=True).distinct()
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001365 assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
jamesren76fcf192010-04-21 20:39:50 +00001366 "span multiple jobs")
1367
1368 job = models.Job.objects.get(id=job_ids[0])
1369 drone_set = job.drone_set
1370 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001371 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001372
1373 return drone_set.get_drone_hostnames()
1374
1375
1376 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1377 """
1378 Returns the user's default drone set, if present.
1379
1380 Otherwise, returns the global default drone set.
1381 """
1382 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1383 if not user:
1384 logging.warn('%s had no owner; using default drone set',
1385 obj_with_owner)
1386 return default_hostnames
1387 if not user.drone_set:
1388 logging.warn('User %s has no default drone set, using global '
1389 'default', user.login)
1390 return default_hostnames
1391 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001392
1393
1394 def register_necessary_pidfiles(self):
1395 pidfile_id = _drone_manager.get_pidfile_id_from(
1396 self._working_directory(), self._pidfile_name())
1397 _drone_manager.register_pidfile(pidfile_id)
1398
1399 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1400 if paired_pidfile_id:
1401 _drone_manager.register_pidfile(paired_pidfile_id)
1402
1403
1404 def recover(self):
1405 if not self._check_paired_results_exist():
1406 return
1407
1408 self._create_monitor()
1409 self.monitor.attach_to_existing_process(
1410 self._working_directory(), pidfile_name=self._pidfile_name(),
1411 num_processes=self.num_processes)
1412 if not self.monitor.has_process():
1413 # no process to recover; wait to be started normally
1414 self.monitor = None
1415 return
1416
1417 self.started = True
Aviv Keshet225bdfe2013-03-05 10:10:08 -08001418 logging.info('Recovering process %s for %s at %s',
1419 self.monitor.get_process(), type(self).__name__,
1420 self._working_directory())
mbligh36768f02008-02-22 18:28:33 +00001421
1422
mbligh4608b002010-01-05 18:22:35 +00001423 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1424 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001425 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001426 for entry in queue_entries:
1427 if entry.status not in allowed_hqe_statuses:
Dale Curtisaa513362011-03-01 17:27:44 -08001428 raise host_scheduler.SchedulerError(
1429 '%s attempting to start entry with invalid status %s: '
1430 '%s' % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001431 invalid_host_status = (
1432 allowed_host_statuses is not None
1433 and entry.host.status not in allowed_host_statuses)
1434 if invalid_host_status:
Dale Curtisaa513362011-03-01 17:27:44 -08001435 raise host_scheduler.SchedulerError(
1436 '%s attempting to start on queue entry with invalid '
1437 'host status %s: %s'
1438 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001439
1440
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001441SiteAgentTask = utils.import_site_class(
1442 __file__, 'autotest_lib.scheduler.site_monitor_db',
1443 'SiteAgentTask', BaseAgentTask)
1444
1445class AgentTask(SiteAgentTask):
1446 pass
1447
1448
showardd9205182009-04-27 20:09:55 +00001449class TaskWithJobKeyvals(object):
1450 """AgentTask mixin providing functionality to help with job keyval files."""
1451 _KEYVAL_FILE = 'keyval'
1452 def _format_keyval(self, key, value):
1453 return '%s=%s' % (key, value)
1454
1455
1456 def _keyval_path(self):
1457 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001458 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001459
1460
1461 def _write_keyval_after_job(self, field, value):
1462 assert self.monitor
1463 if not self.monitor.has_process():
1464 return
1465 _drone_manager.write_lines_to_file(
1466 self._keyval_path(), [self._format_keyval(field, value)],
1467 paired_with_process=self.monitor.get_process())
1468
1469
1470 def _job_queued_keyval(self, job):
1471 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1472
1473
1474 def _write_job_finished(self):
1475 self._write_keyval_after_job("job_finished", int(time.time()))
1476
1477
showarddb502762009-09-09 15:31:20 +00001478 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1479 keyval_contents = '\n'.join(self._format_keyval(key, value)
1480 for key, value in keyval_dict.iteritems())
1481 # always end with a newline to allow additional keyvals to be written
1482 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001483 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001484 keyval_contents,
1485 file_path=keyval_path)
1486
1487
1488 def _write_keyvals_before_job(self, keyval_dict):
1489 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1490
1491
1492 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001493 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001494 host.hostname)
1495 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001496 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001497 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1498 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1499
1500
showard8cc058f2009-09-08 16:26:33 +00001501class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001502 """
1503 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1504 """
1505
1506 TASK_TYPE = None
1507 host = None
1508 queue_entry = None
1509
showardd1195652009-12-08 22:21:02 +00001510 def __init__(self, task, extra_command_args):
1511 super(SpecialAgentTask, self).__init__()
1512
lmrb7c5d272010-04-16 06:34:04 +00001513 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001514
jamesrenc44ae992010-02-19 00:12:54 +00001515 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001516 self.queue_entry = None
1517 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001518 self.queue_entry = scheduler_models.HostQueueEntry(
1519 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001520
showarded2afea2009-07-07 20:54:07 +00001521 self.task = task
1522 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001523
1524
showard8cc058f2009-09-08 16:26:33 +00001525 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001526 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1527
1528
1529 def _command_line(self):
1530 return _autoserv_command_line(self.host.hostname,
1531 self._extra_command_args,
1532 queue_entry=self.queue_entry)
1533
1534
1535 def _working_directory(self):
1536 return self.task.execution_path()
1537
1538
1539 @property
1540 def owner_username(self):
1541 if self.task.requested_by:
1542 return self.task.requested_by.login
1543 return None
showard8cc058f2009-09-08 16:26:33 +00001544
1545
showarded2afea2009-07-07 20:54:07 +00001546 def prolog(self):
1547 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001548 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001549 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001550
1551
showardde634ee2009-01-30 01:44:24 +00001552 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001553 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001554
showard2fe3f1d2009-07-06 20:19:11 +00001555 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001556 return # don't fail metahost entries, they'll be reassigned
1557
showard2fe3f1d2009-07-06 20:19:11 +00001558 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001559 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001560 return # entry has been aborted
1561
showard2fe3f1d2009-07-06 20:19:11 +00001562 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001563 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001564 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001565 self._write_keyval_after_job(queued_key, queued_time)
1566 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001567
showard8cc058f2009-09-08 16:26:33 +00001568 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001569 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001570 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001571 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001572
showard8cc058f2009-09-08 16:26:33 +00001573 pidfile_id = _drone_manager.get_pidfile_id_from(
1574 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001575 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001576 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001577
1578 if self.queue_entry.job.parse_failed_repair:
1579 self._parse_results([self.queue_entry])
1580 else:
1581 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001582
1583
1584 def cleanup(self):
1585 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001586
1587 # We will consider an aborted task to be "Failed"
1588 self.task.finish(bool(self.success))
1589
showardf85a0b72009-10-07 20:48:45 +00001590 if self.monitor:
1591 if self.monitor.has_process():
1592 self._copy_results([self.task])
1593 if self.monitor.pidfile_id is not None:
1594 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001595
1596
1597class RepairTask(SpecialAgentTask):
1598 TASK_TYPE = models.SpecialTask.Task.REPAIR
1599
1600
showardd1195652009-12-08 22:21:02 +00001601 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001602 """\
1603 queue_entry: queue entry to mark failed if this repair fails.
1604 """
1605 protection = host_protections.Protection.get_string(
1606 task.host.protection)
1607 # normalize the protection name
1608 protection = host_protections.Protection.get_attr_name(protection)
1609
1610 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001611 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001612
1613 # *don't* include the queue entry in IDs -- if the queue entry is
1614 # aborted, we want to leave the repair task running
1615 self._set_ids(host=self.host)
1616
1617
1618 def prolog(self):
1619 super(RepairTask, self).prolog()
1620 logging.info("repair_task starting")
1621 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001622
1623
jadmanski0afbb632008-06-06 21:10:57 +00001624 def epilog(self):
1625 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001626
jadmanski0afbb632008-06-06 21:10:57 +00001627 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001628 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001629 else:
showard8cc058f2009-09-08 16:26:33 +00001630 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001631 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001632 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001633
1634
showarded2afea2009-07-07 20:54:07 +00001635class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001636 def _copy_to_results_repository(self):
1637 if not self.queue_entry or self.queue_entry.meta_host:
1638 return
1639
1640 self.queue_entry.set_execution_subdir()
1641 log_name = os.path.basename(self.task.execution_path())
1642 source = os.path.join(self.task.execution_path(), 'debug',
1643 'autoserv.DEBUG')
1644 destination = os.path.join(
1645 self.queue_entry.execution_path(), log_name)
1646
1647 self.monitor.try_copy_to_results_repository(
1648 source, destination_path=destination)
1649
1650
showard170873e2009-01-07 00:22:26 +00001651 def epilog(self):
1652 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001653
showard775300b2009-09-09 15:30:50 +00001654 if self.success:
1655 return
showard8fe93b52008-11-18 17:53:22 +00001656
showard775300b2009-09-09 15:30:50 +00001657 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001658
showard775300b2009-09-09 15:30:50 +00001659 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001660 # effectively ignore failure for these hosts
1661 self.success = True
showard775300b2009-09-09 15:30:50 +00001662 return
1663
1664 if self.queue_entry:
1665 self.queue_entry.requeue()
1666
1667 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001668 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001669 queue_entry__id=self.queue_entry.id):
1670 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1671 self._fail_queue_entry()
1672 return
1673
showard9bb960b2009-11-19 01:02:11 +00001674 queue_entry = models.HostQueueEntry.objects.get(
1675 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001676 else:
1677 queue_entry = None
1678
1679 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001680 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001681 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001682 queue_entry=queue_entry,
1683 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001684
showard8fe93b52008-11-18 17:53:22 +00001685
1686class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001687 TASK_TYPE = models.SpecialTask.Task.VERIFY
1688
1689
showardd1195652009-12-08 22:21:02 +00001690 def __init__(self, task):
1691 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001692 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001693
1694
jadmanski0afbb632008-06-06 21:10:57 +00001695 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001696 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001697
showardb18134f2009-03-20 20:52:18 +00001698 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001699 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001700 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1701 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001702
jamesren42318f72010-05-10 23:40:59 +00001703 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00001704 # and there's no need to keep records of other requests.
1705 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001706 host__id=self.host.id,
1707 task=models.SpecialTask.Task.VERIFY,
jamesren42318f72010-05-10 23:40:59 +00001708 is_active=False, is_complete=False, queue_entry=None)
showarded2afea2009-07-07 20:54:07 +00001709 queued_verifies = queued_verifies.exclude(id=self.task.id)
1710 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001711
mbligh36768f02008-02-22 18:28:33 +00001712
jadmanski0afbb632008-06-06 21:10:57 +00001713 def epilog(self):
1714 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001715 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001716 if self.queue_entry:
1717 self.queue_entry.on_pending()
1718 else:
1719 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001720
1721
mbligh4608b002010-01-05 18:22:35 +00001722class CleanupTask(PreJobTask):
1723 # note this can also run post-job, but when it does, it's running standalone
1724 # against the host (not related to the job), so it's not considered a
1725 # PostJobTask
1726
1727 TASK_TYPE = models.SpecialTask.Task.CLEANUP
1728
1729
1730 def __init__(self, task, recover_run_monitor=None):
1731 super(CleanupTask, self).__init__(task, ['--cleanup'])
1732 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1733
1734
1735 def prolog(self):
1736 super(CleanupTask, self).prolog()
1737 logging.info("starting cleanup task for host: %s", self.host.hostname)
1738 self.host.set_status(models.Host.Status.CLEANING)
1739 if self.queue_entry:
1740 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1741
1742
1743 def _finish_epilog(self):
1744 if not self.queue_entry or not self.success:
1745 return
1746
1747 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
1748 should_run_verify = (
1749 self.queue_entry.job.run_verify
1750 and self.host.protection != do_not_verify_protection)
1751 if should_run_verify:
1752 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1753 models.SpecialTask.objects.create(
1754 host=models.Host.objects.get(id=self.host.id),
1755 queue_entry=entry,
1756 task=models.SpecialTask.Task.VERIFY)
1757 else:
1758 self.queue_entry.on_pending()
1759
1760
1761 def epilog(self):
1762 super(CleanupTask, self).epilog()
1763
1764 if self.success:
1765 self.host.update_field('dirty', 0)
1766 self.host.set_status(models.Host.Status.READY)
1767
1768 self._finish_epilog()
1769
1770
showarda9545c02009-12-18 22:44:26 +00001771class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
1772 """
1773 Common functionality for QueueTask and HostlessQueueTask
1774 """
1775 def __init__(self, queue_entries):
1776 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001777 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001778 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001779
1780
showard73ec0442009-02-07 02:05:20 +00001781 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001782 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001783
1784
jamesrenc44ae992010-02-19 00:12:54 +00001785 def _write_control_file(self, execution_path):
1786 control_path = _drone_manager.attach_file_to_execution(
1787 execution_path, self.job.control_file)
1788 return control_path
1789
1790
showardd1195652009-12-08 22:21:02 +00001791 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001792 execution_path = self.queue_entries[0].execution_path()
1793 control_path = self._write_control_file(execution_path)
1794 hostnames = ','.join(entry.host.hostname
1795 for entry in self.queue_entries
1796 if not entry.is_hostless())
1797
1798 execution_tag = self.queue_entries[0].execution_tag()
1799 params = _autoserv_command_line(
1800 hostnames,
1801 ['-P', execution_tag, '-n',
1802 _drone_manager.absolute_path(control_path)],
1803 job=self.job, verbose=False)
1804
1805 if not self.job.is_server_job():
1806 params.append('-c')
1807
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001808 if self.job.is_image_update_job():
1809 params += ['--image', self.job.update_image_path]
1810
jamesrenc44ae992010-02-19 00:12:54 +00001811 return params
showardd1195652009-12-08 22:21:02 +00001812
1813
1814 @property
1815 def num_processes(self):
1816 return len(self.queue_entries)
1817
1818
1819 @property
1820 def owner_username(self):
1821 return self.job.owner
1822
1823
1824 def _working_directory(self):
1825 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001826
1827
jadmanski0afbb632008-06-06 21:10:57 +00001828 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001829 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001830 keyval_dict = self.job.keyval_dict()
1831 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001832 group_name = self.queue_entries[0].get_group_name()
1833 if group_name:
1834 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001835 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001836 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001837 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001838 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001839
1840
showard35162b02009-03-03 02:17:30 +00001841 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001842 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001843 _drone_manager.write_lines_to_file(error_file_path,
1844 [_LOST_PROCESS_ERROR])
1845
1846
showardd3dc1992009-04-22 21:01:40 +00001847 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001848 if not self.monitor:
1849 return
1850
showardd9205182009-04-27 20:09:55 +00001851 self._write_job_finished()
1852
showard35162b02009-03-03 02:17:30 +00001853 if self.monitor.lost_process:
1854 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001855
jadmanskif7fa2cc2008-10-01 14:13:23 +00001856
showardcbd74612008-11-19 21:42:02 +00001857 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001858 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001859 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001860 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001861 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001862
1863
jadmanskif7fa2cc2008-10-01 14:13:23 +00001864 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001865 if not self.monitor or not self.monitor.has_process():
1866 return
1867
jadmanskif7fa2cc2008-10-01 14:13:23 +00001868 # build up sets of all the aborted_by and aborted_on values
1869 aborted_by, aborted_on = set(), set()
1870 for queue_entry in self.queue_entries:
1871 if queue_entry.aborted_by:
1872 aborted_by.add(queue_entry.aborted_by)
1873 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1874 aborted_on.add(t)
1875
1876 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001877 # TODO(showard): this conditional is now obsolete, we just need to leave
1878 # it in temporarily for backwards compatibility over upgrades. delete
1879 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001880 assert len(aborted_by) <= 1
1881 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001882 aborted_by_value = aborted_by.pop()
1883 aborted_on_value = max(aborted_on)
1884 else:
1885 aborted_by_value = 'autotest_system'
1886 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001887
showarda0382352009-02-11 23:36:43 +00001888 self._write_keyval_after_job("aborted_by", aborted_by_value)
1889 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001890
showardcbd74612008-11-19 21:42:02 +00001891 aborted_on_string = str(datetime.datetime.fromtimestamp(
1892 aborted_on_value))
1893 self._write_status_comment('Job aborted by %s on %s' %
1894 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001895
1896
jadmanski0afbb632008-06-06 21:10:57 +00001897 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001898 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001899 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001900 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001901
1902
jadmanski0afbb632008-06-06 21:10:57 +00001903 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001904 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001905 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001906
1907
1908class QueueTask(AbstractQueueTask):
1909 def __init__(self, queue_entries):
1910 super(QueueTask, self).__init__(queue_entries)
1911 self._set_ids(queue_entries=queue_entries)
1912
1913
1914 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001915 self._check_queue_entry_statuses(
1916 self.queue_entries,
1917 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1918 models.HostQueueEntry.Status.RUNNING),
1919 allowed_host_statuses=(models.Host.Status.PENDING,
1920 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001921
1922 super(QueueTask, self).prolog()
1923
1924 for queue_entry in self.queue_entries:
1925 self._write_host_keyvals(queue_entry.host)
1926 queue_entry.host.set_status(models.Host.Status.RUNNING)
1927 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001928
1929
1930 def _finish_task(self):
1931 super(QueueTask, self)._finish_task()
1932
1933 for queue_entry in self.queue_entries:
1934 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001935 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001936
1937
mbligh4608b002010-01-05 18:22:35 +00001938class HostlessQueueTask(AbstractQueueTask):
1939 def __init__(self, queue_entry):
1940 super(HostlessQueueTask, self).__init__([queue_entry])
1941 self.queue_entry_ids = [queue_entry.id]
1942
1943
1944 def prolog(self):
1945 self.queue_entries[0].update_field('execution_subdir', 'hostless')
1946 super(HostlessQueueTask, self).prolog()
1947
1948
mbligh4608b002010-01-05 18:22:35 +00001949 def _finish_task(self):
1950 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00001951 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001952
1953
showardd3dc1992009-04-22 21:01:40 +00001954class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00001955 def __init__(self, queue_entries, log_file_name):
1956 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00001957
showardd1195652009-12-08 22:21:02 +00001958 self.queue_entries = queue_entries
1959
showardd3dc1992009-04-22 21:01:40 +00001960 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00001961 self._autoserv_monitor.attach_to_existing_process(
1962 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00001963
showardd1195652009-12-08 22:21:02 +00001964
1965 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00001966 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00001967 return 'true'
1968 return self._generate_command(
1969 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00001970
1971
1972 def _generate_command(self, results_dir):
1973 raise NotImplementedError('Subclasses must override this')
1974
1975
showardd1195652009-12-08 22:21:02 +00001976 @property
1977 def owner_username(self):
1978 return self.queue_entries[0].job.owner
1979
1980
1981 def _working_directory(self):
1982 return self._get_consistent_execution_path(self.queue_entries)
1983
1984
1985 def _paired_with_monitor(self):
1986 return self._autoserv_monitor
1987
1988
showardd3dc1992009-04-22 21:01:40 +00001989 def _job_was_aborted(self):
1990 was_aborted = None
showardd1195652009-12-08 22:21:02 +00001991 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00001992 queue_entry.update_from_database()
1993 if was_aborted is None: # first queue entry
1994 was_aborted = bool(queue_entry.aborted)
1995 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00001996 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
1997 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00001998 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00001999 'Inconsistent abort state',
2000 'Queue entries have inconsistent abort state:\n' +
2001 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00002002 # don't crash here, just assume true
2003 return True
2004 return was_aborted
2005
2006
showardd1195652009-12-08 22:21:02 +00002007 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002008 if self._job_was_aborted():
2009 return models.HostQueueEntry.Status.ABORTED
2010
2011 # we'll use a PidfileRunMonitor to read the autoserv exit status
2012 if self._autoserv_monitor.exit_code() == 0:
2013 return models.HostQueueEntry.Status.COMPLETED
2014 return models.HostQueueEntry.Status.FAILED
2015
2016
showardd3dc1992009-04-22 21:01:40 +00002017 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002018 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002019 queue_entry.set_status(status)
2020
2021
2022 def abort(self):
2023 # override AgentTask.abort() to avoid killing the process and ending
2024 # the task. post-job tasks continue when the job is aborted.
2025 pass
2026
2027
mbligh4608b002010-01-05 18:22:35 +00002028 def _pidfile_label(self):
2029 # '.autoserv_execute' -> 'autoserv'
2030 return self._pidfile_name()[1:-len('_execute')]
2031
2032
showard9bb960b2009-11-19 01:02:11 +00002033class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002034 """
2035 Task responsible for
2036 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2037 * copying logs to the results repository
2038 * spawning CleanupTasks for hosts, if necessary
2039 * spawning a FinalReparseTask for the job
2040 """
showardd1195652009-12-08 22:21:02 +00002041 def __init__(self, queue_entries, recover_run_monitor=None):
2042 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002043 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002044 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002045 self._set_ids(queue_entries=queue_entries)
2046
2047
2048 def _generate_command(self, results_dir):
2049 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002050 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002051 return [_autoserv_path , '-p',
2052 '--pidfile-label=%s' % self._pidfile_label(),
2053 '--use-existing-results', '--collect-crashinfo',
2054 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002055
2056
showardd1195652009-12-08 22:21:02 +00002057 @property
2058 def num_processes(self):
2059 return len(self.queue_entries)
2060
2061
2062 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002063 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002064
2065
showardd3dc1992009-04-22 21:01:40 +00002066 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002067 self._check_queue_entry_statuses(
2068 self.queue_entries,
2069 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2070 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002071
showardd3dc1992009-04-22 21:01:40 +00002072 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002073
2074
showardd3dc1992009-04-22 21:01:40 +00002075 def epilog(self):
2076 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002077 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002078 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002079
showard9bb960b2009-11-19 01:02:11 +00002080
2081 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002082 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002083 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002084 models.HostQueueEntry.Status.COMPLETED)
2085 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2086 else:
2087 final_success = False
2088 num_tests_failed = 0
2089
showard9bb960b2009-11-19 01:02:11 +00002090 reboot_after = self._job.reboot_after
2091 do_reboot = (
2092 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002093 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002094 or reboot_after == model_attributes.RebootAfter.ALWAYS
2095 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
showard9bb960b2009-11-19 01:02:11 +00002096 and final_success and num_tests_failed == 0))
2097
showardd1195652009-12-08 22:21:02 +00002098 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002099 if do_reboot:
2100 # don't pass the queue entry to the CleanupTask. if the cleanup
2101 # fails, the job doesn't care -- it's over.
2102 models.SpecialTask.objects.create(
2103 host=models.Host.objects.get(id=queue_entry.host.id),
2104 task=models.SpecialTask.Task.CLEANUP,
2105 requested_by=self._job.owner_model())
2106 else:
2107 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002108
2109
showard0bbfc212009-04-29 21:06:13 +00002110 def run(self):
showard597bfd32009-05-08 18:22:50 +00002111 autoserv_exit_code = self._autoserv_monitor.exit_code()
2112 # only run if Autoserv exited due to some signal. if we have no exit
2113 # code, assume something bad (and signal-like) happened.
2114 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002115 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002116 else:
2117 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002118
2119
mbligh4608b002010-01-05 18:22:35 +00002120class SelfThrottledPostJobTask(PostJobTask):
2121 """
2122 Special AgentTask subclass that maintains its own global process limit.
2123 """
2124 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002125
2126
mbligh4608b002010-01-05 18:22:35 +00002127 @classmethod
2128 def _increment_running_processes(cls):
2129 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002130
mblighd5c95802008-03-05 00:33:46 +00002131
mbligh4608b002010-01-05 18:22:35 +00002132 @classmethod
2133 def _decrement_running_processes(cls):
2134 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002135
2136
mbligh4608b002010-01-05 18:22:35 +00002137 @classmethod
2138 def _max_processes(cls):
2139 raise NotImplementedError
2140
2141
2142 @classmethod
2143 def _can_run_new_process(cls):
2144 return cls._num_running_processes < cls._max_processes()
2145
2146
2147 def _process_started(self):
2148 return bool(self.monitor)
2149
2150
2151 def tick(self):
2152 # override tick to keep trying to start until the process count goes
2153 # down and we can, at which point we revert to default behavior
2154 if self._process_started():
2155 super(SelfThrottledPostJobTask, self).tick()
2156 else:
2157 self._try_starting_process()
2158
2159
2160 def run(self):
2161 # override run() to not actually run unless we can
2162 self._try_starting_process()
2163
2164
2165 def _try_starting_process(self):
2166 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002167 return
2168
mbligh4608b002010-01-05 18:22:35 +00002169 # actually run the command
2170 super(SelfThrottledPostJobTask, self).run()
jamesren25663562010-04-27 18:00:55 +00002171 if self._process_started():
2172 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002173
mblighd5c95802008-03-05 00:33:46 +00002174
mbligh4608b002010-01-05 18:22:35 +00002175 def finished(self, success):
2176 super(SelfThrottledPostJobTask, self).finished(success)
2177 if self._process_started():
2178 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002179
showard21baa452008-10-21 00:08:39 +00002180
mbligh4608b002010-01-05 18:22:35 +00002181class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002182 def __init__(self, queue_entries):
2183 super(FinalReparseTask, self).__init__(queue_entries,
2184 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002185 # don't use _set_ids, since we don't want to set the host_ids
2186 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002187
2188
2189 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002190 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002191 results_dir]
2192
2193
2194 @property
2195 def num_processes(self):
2196 return 0 # don't include parser processes in accounting
2197
2198
2199 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002200 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002201
2202
showard97aed502008-11-04 02:01:24 +00002203 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002204 def _max_processes(cls):
2205 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002206
2207
2208 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002209 self._check_queue_entry_statuses(
2210 self.queue_entries,
2211 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002212
showard97aed502008-11-04 02:01:24 +00002213 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002214
2215
2216 def epilog(self):
2217 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002218 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002219
2220
mbligh4608b002010-01-05 18:22:35 +00002221class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002222 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2223
mbligh4608b002010-01-05 18:22:35 +00002224 def __init__(self, queue_entries):
2225 super(ArchiveResultsTask, self).__init__(queue_entries,
2226 log_file_name='.archiving.log')
2227 # don't use _set_ids, since we don't want to set the host_ids
2228 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002229
2230
mbligh4608b002010-01-05 18:22:35 +00002231 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002232 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002233
2234
mbligh4608b002010-01-05 18:22:35 +00002235 def _generate_command(self, results_dir):
2236 return [_autoserv_path , '-p',
2237 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002238 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002239 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2240 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002241
2242
mbligh4608b002010-01-05 18:22:35 +00002243 @classmethod
2244 def _max_processes(cls):
2245 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002246
2247
2248 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002249 self._check_queue_entry_statuses(
2250 self.queue_entries,
2251 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2252
2253 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002254
2255
mbligh4608b002010-01-05 18:22:35 +00002256 def epilog(self):
2257 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002258 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002259 failed_file = os.path.join(self._working_directory(),
2260 self._ARCHIVING_FAILED_FILE)
2261 paired_process = self._paired_with_monitor().get_process()
2262 _drone_manager.write_lines_to_file(
2263 failed_file, ['Archiving failed with exit code %s'
2264 % self.monitor.exit_code()],
2265 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002266 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002267
2268
mbligh36768f02008-02-22 18:28:33 +00002269if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002270 main()