blob: d1a43d065f8c4a19898fe24c41071b48f62c37a0 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
mbligh36768f02008-02-22 18:28:33 +00008
Aviv Keshet225bdfe2013-03-05 10:10:08 -08009import datetime, optparse, os, signal
10import sys, time, traceback, urllib
11import logging, gc
showard402934a2009-12-21 22:20:47 +000012
Alex Miller05d7b4c2013-03-04 07:49:38 -080013import common
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000015
16import django.db
17
showard136e6dc2009-06-10 19:38:49 +000018from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000019from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000020from autotest_lib.database import database_connection
jamesrendd855242010-03-02 22:23:44 +000021from autotest_lib.frontend.afe import model_attributes
Alex Miller05d7b4c2013-03-04 07:49:38 -080022from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000023from autotest_lib.scheduler import drone_manager, drones, email_manager
Dale Curtisaa513362011-03-01 17:27:44 -080024from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
Alex Miller05d7b4c2013-03-04 07:49:38 -080025from autotest_lib.scheduler import scheduler_logging_config
jamesrenc44ae992010-02-19 00:12:54 +000026from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080027from autotest_lib.scheduler import status_server, scheduler_config
28
showard549afad2009-08-20 23:33:36 +000029BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
30PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000031
mbligh36768f02008-02-22 18:28:33 +000032RESULTS_DIR = '.'
33AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000034DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000035AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
36
37if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000038 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000039AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
40AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
41
42if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000043 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000044
showard35162b02009-03-03 02:17:30 +000045# error message to leave in results dir when an autoserv process disappears
46# mysteriously
47_LOST_PROCESS_ERROR = """\
48Autoserv failed abnormally during execution for this job, probably due to a
49system error on the Autotest server. Full results may not be available. Sorry.
50"""
51
mbligh6f8bab42008-02-29 22:45:14 +000052_db = None
mbligh36768f02008-02-22 18:28:33 +000053_shutdown = False
showard170873e2009-01-07 00:22:26 +000054_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000055_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000056_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000057
Eric Lie0493a42010-11-15 13:05:43 -080058def _parser_path_default(install_dir):
59 return os.path.join(install_dir, 'tko', 'parse')
60_parser_path_func = utils.import_site_function(
61 __file__, 'autotest_lib.scheduler.site_monitor_db',
62 'parser_path', _parser_path_default)
63_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
64
mbligh36768f02008-02-22 18:28:33 +000065
showardec6a3b92009-09-25 20:29:13 +000066def _get_pidfile_timeout_secs():
67 """@returns How long to wait for autoserv to write pidfile."""
68 pidfile_timeout_mins = global_config.global_config.get_config_value(
69 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
70 return pidfile_timeout_mins * 60
71
72
mbligh83c1e9e2009-05-01 23:10:41 +000073def _site_init_monitor_db_dummy():
74 return {}
75
76
jamesren76fcf192010-04-21 20:39:50 +000077def _verify_default_drone_set_exists():
78 if (models.DroneSet.drone_sets_enabled() and
79 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080080 raise host_scheduler.SchedulerError(
81 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000082
83
84def _sanity_check():
85 """Make sure the configs are consistent before starting the scheduler"""
86 _verify_default_drone_set_exists()
87
88
mbligh36768f02008-02-22 18:28:33 +000089def main():
showard27f33872009-04-07 18:20:53 +000090 try:
showard549afad2009-08-20 23:33:36 +000091 try:
92 main_without_exception_handling()
93 except SystemExit:
94 raise
95 except:
96 logging.exception('Exception escaping in monitor_db')
97 raise
98 finally:
99 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000100
101
102def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000103 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000104
showard136e6dc2009-06-10 19:38:49 +0000105 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000106 parser = optparse.OptionParser(usage)
107 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
108 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000109 parser.add_option('--test', help='Indicate that scheduler is under ' +
110 'test and should use dummy autoserv and no parsing',
111 action='store_true')
112 (options, args) = parser.parse_args()
113 if len(args) != 1:
114 parser.print_usage()
115 return
mbligh36768f02008-02-22 18:28:33 +0000116
showard5613c662009-06-08 23:30:33 +0000117 scheduler_enabled = global_config.global_config.get_config_value(
118 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
119
120 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800121 logging.error("Scheduler not enabled, set enable_scheduler to true in "
122 "the global_config's SCHEDULER section to enable it. "
123 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000124 sys.exit(1)
125
jadmanski0afbb632008-06-06 21:10:57 +0000126 global RESULTS_DIR
127 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000128
mbligh83c1e9e2009-05-01 23:10:41 +0000129 site_init = utils.import_site_function(__file__,
130 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
131 _site_init_monitor_db_dummy)
132 site_init()
133
showardcca334f2009-03-12 20:38:34 +0000134 # Change the cwd while running to avoid issues incase we were launched from
135 # somewhere odd (such as a random NFS home directory of the person running
136 # sudo to launch us as the appropriate user).
137 os.chdir(RESULTS_DIR)
138
jamesrenc7d387e2010-08-10 21:48:30 +0000139 # This is helpful for debugging why stuff a scheduler launches is
140 # misbehaving.
141 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000142
jadmanski0afbb632008-06-06 21:10:57 +0000143 if options.test:
144 global _autoserv_path
145 _autoserv_path = 'autoserv_dummy'
146 global _testing_mode
147 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000148
jamesrenc44ae992010-02-19 00:12:54 +0000149 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000150 server.start()
151
jadmanski0afbb632008-06-06 21:10:57 +0000152 try:
jamesrenc44ae992010-02-19 00:12:54 +0000153 initialize()
showardc5afc462009-01-13 00:09:39 +0000154 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000155 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000156
Eric Lia82dc352011-02-23 13:15:52 -0800157 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000158 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000159 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000160 except:
showard170873e2009-01-07 00:22:26 +0000161 email_manager.manager.log_stacktrace(
162 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000163
showard170873e2009-01-07 00:22:26 +0000164 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000165 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000166 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000167 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000168
169
showard136e6dc2009-06-10 19:38:49 +0000170def setup_logging():
171 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
172 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
173 logging_manager.configure_logging(
174 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
175 logfile_name=log_name)
176
177
mbligh36768f02008-02-22 18:28:33 +0000178def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000179 global _shutdown
180 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000181 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000182
183
jamesrenc44ae992010-02-19 00:12:54 +0000184def initialize():
showardb18134f2009-03-20 20:52:18 +0000185 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
186 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000187
showard8de37132009-08-31 18:33:08 +0000188 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000189 logging.critical("monitor_db already running, aborting!")
190 sys.exit(1)
191 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000192
showardb1e51872008-10-07 11:08:18 +0000193 if _testing_mode:
194 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000195 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000196
jadmanski0afbb632008-06-06 21:10:57 +0000197 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
198 global _db
showard170873e2009-01-07 00:22:26 +0000199 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000200 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000201
showardfa8629c2008-11-04 16:51:23 +0000202 # ensure Django connection is in autocommit
203 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000204 # bypass the readonly connection
205 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000206
showardb18134f2009-03-20 20:52:18 +0000207 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000208 signal.signal(signal.SIGINT, handle_sigint)
209
jamesrenc44ae992010-02-19 00:12:54 +0000210 initialize_globals()
211 scheduler_models.initialize()
212
showardd1ee1dd2009-01-07 21:33:08 +0000213 drones = global_config.global_config.get_config_value(
214 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
215 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000216 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000217 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000218 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
219
showardb18134f2009-03-20 20:52:18 +0000220 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000221
222
jamesrenc44ae992010-02-19 00:12:54 +0000223def initialize_globals():
224 global _drone_manager
225 _drone_manager = drone_manager.instance()
226
227
showarded2afea2009-07-07 20:54:07 +0000228def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
229 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000230 """
231 @returns The autoserv command line as a list of executable + parameters.
232
233 @param machines - string - A machine or comma separated list of machines
234 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000235 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet3664d072013-03-04 16:22:55 -0800236 @param job - Job object - If supplied, -u owner, -l name, and --test-retry
237 parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000238 @param queue_entry - A HostQueueEntry object - If supplied and no Job
239 object was supplied, this will be used to lookup the Job object.
240 """
showarda9545c02009-12-18 22:44:26 +0000241 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000242 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000243 if machines:
244 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000245 if job or queue_entry:
246 if not job:
247 job = queue_entry.job
248 autoserv_argv += ['-u', job.owner, '-l', job.name]
Aviv Keshet3664d072013-03-04 16:22:55 -0800249 if job.test_retry:
250 autoserv_argv += ['--test-retry='+str(job.test_retry)]
showarde9c69362009-06-30 01:58:03 +0000251 if verbose:
252 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000253 return autoserv_argv + extra_args
254
255
Simran Basia858a232012-08-21 11:04:37 -0700256class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800257
258
jadmanski0afbb632008-06-06 21:10:57 +0000259 def __init__(self):
260 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000261 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800262 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000263 user_cleanup_time = scheduler_config.config.clean_interval
264 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
265 _db, user_cleanup_time)
266 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000267 self._host_agents = {}
268 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000269 self._tick_count = 0
270 self._last_garbage_stats_time = time.time()
271 self._seconds_between_garbage_stats = 60 * (
272 global_config.global_config.get_config_value(
273 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700274 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700275 self._tick_debug = global_config.global_config.get_config_value(
276 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
277 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700278 self._extra_debugging = global_config.global_config.get_config_value(
279 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
280 default=False)
mbligh36768f02008-02-22 18:28:33 +0000281
mbligh36768f02008-02-22 18:28:33 +0000282
showard915958d2009-04-22 21:00:58 +0000283 def initialize(self, recover_hosts=True):
284 self._periodic_cleanup.initialize()
285 self._24hr_upkeep.initialize()
286
jadmanski0afbb632008-06-06 21:10:57 +0000287 # always recover processes
288 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000289
jadmanski0afbb632008-06-06 21:10:57 +0000290 if recover_hosts:
291 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000292
jamesrenc44ae992010-02-19 00:12:54 +0000293 self._host_scheduler.recovery_on_startup()
294
mbligh36768f02008-02-22 18:28:33 +0000295
Simran Basi0ec94dd2012-08-28 09:50:10 -0700296 def _log_tick_msg(self, msg):
297 if self._tick_debug:
298 logging.debug(msg)
299
300
Simran Basidef92872012-09-20 13:34:34 -0700301 def _log_extra_msg(self, msg):
302 if self._extra_debugging:
303 logging.debug(msg)
304
305
jadmanski0afbb632008-06-06 21:10:57 +0000306 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700307 """
308 This is an altered version of tick() where we keep track of when each
309 major step begins so we can try to figure out where we are using most
310 of the tick time.
311 """
Simran Basi3f6717d2012-09-13 15:21:22 -0700312 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000313 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700314 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000315 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700316 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000317 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700318 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000319 self._find_aborting()
Simran Basi3f6717d2012-09-13 15:21:22 -0700320 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000321 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700322 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000323 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700324 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000325 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700326 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000327 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700328 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000329 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700330 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000331 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700332 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000333 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700334 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000335 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700336 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700337 'email_manager.manager.send_queued_emails().')
showard170873e2009-01-07 00:22:26 +0000338 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700339 self._log_tick_msg('Calling django.db.reset_queries().')
showard402934a2009-12-21 22:20:47 +0000340 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000341 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000342
showard97aed502008-11-04 02:01:24 +0000343
mblighf3294cc2009-04-08 21:17:38 +0000344 def _run_cleanup(self):
345 self._periodic_cleanup.run_cleanup_maybe()
346 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000347
mbligh36768f02008-02-22 18:28:33 +0000348
showardf13a9e22009-12-18 22:54:09 +0000349 def _garbage_collection(self):
350 threshold_time = time.time() - self._seconds_between_garbage_stats
351 if threshold_time < self._last_garbage_stats_time:
352 # Don't generate these reports very often.
353 return
354
355 self._last_garbage_stats_time = time.time()
356 # Force a full level 0 collection (because we can, it doesn't hurt
357 # at this interval).
358 gc.collect()
359 logging.info('Logging garbage collector stats on tick %d.',
360 self._tick_count)
361 gc_stats._log_garbage_collector_stats()
362
363
showard170873e2009-01-07 00:22:26 +0000364 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
365 for object_id in object_ids:
366 agent_dict.setdefault(object_id, set()).add(agent)
367
368
369 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
370 for object_id in object_ids:
371 assert object_id in agent_dict
372 agent_dict[object_id].remove(agent)
373
374
showardd1195652009-12-08 22:21:02 +0000375 def add_agent_task(self, agent_task):
376 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000377 self._agents.append(agent)
378 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000379 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
380 self._register_agent_for_ids(self._queue_entry_agents,
381 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000382
showard170873e2009-01-07 00:22:26 +0000383
384 def get_agents_for_entry(self, queue_entry):
385 """
386 Find agents corresponding to the specified queue_entry.
387 """
showardd3dc1992009-04-22 21:01:40 +0000388 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000389
390
391 def host_has_agent(self, host):
392 """
393 Determine if there is currently an Agent present using this host.
394 """
395 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000396
397
jadmanski0afbb632008-06-06 21:10:57 +0000398 def remove_agent(self, agent):
399 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000400 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
401 agent)
402 self._unregister_agent_for_ids(self._queue_entry_agents,
403 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000404
405
showard8cc058f2009-09-08 16:26:33 +0000406 def _host_has_scheduled_special_task(self, host):
407 return bool(models.SpecialTask.objects.filter(host__id=host.id,
408 is_active=False,
409 is_complete=False))
410
411
jadmanski0afbb632008-06-06 21:10:57 +0000412 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000413 agent_tasks = self._create_recovery_agent_tasks()
414 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000415 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000416 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000417 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000418 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000419 self._reverify_remaining_hosts()
420 # reinitialize drones after killing orphaned processes, since they can
421 # leave around files when they die
422 _drone_manager.execute_actions()
423 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000424
showard170873e2009-01-07 00:22:26 +0000425
showardd1195652009-12-08 22:21:02 +0000426 def _create_recovery_agent_tasks(self):
427 return (self._get_queue_entry_agent_tasks()
428 + self._get_special_task_agent_tasks(is_active=True))
429
430
431 def _get_queue_entry_agent_tasks(self):
432 # host queue entry statuses handled directly by AgentTasks (Verifying is
433 # handled through SpecialTasks, so is not listed here)
434 statuses = (models.HostQueueEntry.Status.STARTING,
435 models.HostQueueEntry.Status.RUNNING,
436 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000437 models.HostQueueEntry.Status.PARSING,
438 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000439 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000440 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000441 where='status IN (%s)' % status_list)
442
443 agent_tasks = []
444 used_queue_entries = set()
445 for entry in queue_entries:
446 if self.get_agents_for_entry(entry):
447 # already being handled
448 continue
449 if entry in used_queue_entries:
450 # already picked up by a synchronous job
451 continue
452 agent_task = self._get_agent_task_for_queue_entry(entry)
453 agent_tasks.append(agent_task)
454 used_queue_entries.update(agent_task.queue_entries)
455 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000456
457
showardd1195652009-12-08 22:21:02 +0000458 def _get_special_task_agent_tasks(self, is_active=False):
459 special_tasks = models.SpecialTask.objects.filter(
460 is_active=is_active, is_complete=False)
461 return [self._get_agent_task_for_special_task(task)
462 for task in special_tasks]
463
464
465 def _get_agent_task_for_queue_entry(self, queue_entry):
466 """
467 Construct an AgentTask instance for the given active HostQueueEntry,
468 if one can currently run it.
469 @param queue_entry: a HostQueueEntry
470 @returns an AgentTask to run the queue entry
471 """
472 task_entries = queue_entry.job.get_group_entries(queue_entry)
473 self._check_for_duplicate_host_entries(task_entries)
474
475 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
476 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000477 if queue_entry.is_hostless():
478 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000479 return QueueTask(queue_entries=task_entries)
480 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
481 return GatherLogsTask(queue_entries=task_entries)
482 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
483 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000484 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
485 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000486
Dale Curtisaa513362011-03-01 17:27:44 -0800487 raise host_scheduler.SchedulerError(
488 '_get_agent_task_for_queue_entry got entry with '
489 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000490
491
492 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000493 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
494 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000495 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000496 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000497 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000498 if using_host:
showardd1195652009-12-08 22:21:02 +0000499 self._assert_host_has_no_agent(task_entry)
500
501
502 def _assert_host_has_no_agent(self, entry):
503 """
504 @param entry: a HostQueueEntry or a SpecialTask
505 """
506 if self.host_has_agent(entry.host):
507 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800508 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000509 'While scheduling %s, host %s already has a host agent %s'
510 % (entry, entry.host, agent.task))
511
512
513 def _get_agent_task_for_special_task(self, special_task):
514 """
515 Construct an AgentTask class to run the given SpecialTask and add it
516 to this dispatcher.
517 @param special_task: a models.SpecialTask instance
518 @returns an AgentTask to run this SpecialTask
519 """
520 self._assert_host_has_no_agent(special_task)
521
522 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
523 for agent_task_class in special_agent_task_classes:
524 if agent_task_class.TASK_TYPE == special_task.task:
525 return agent_task_class(task=special_task)
526
Dale Curtisaa513362011-03-01 17:27:44 -0800527 raise host_scheduler.SchedulerError(
528 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000529
530
531 def _register_pidfiles(self, agent_tasks):
532 for agent_task in agent_tasks:
533 agent_task.register_necessary_pidfiles()
534
535
536 def _recover_tasks(self, agent_tasks):
537 orphans = _drone_manager.get_orphaned_autoserv_processes()
538
539 for agent_task in agent_tasks:
540 agent_task.recover()
541 if agent_task.monitor and agent_task.monitor.has_process():
542 orphans.discard(agent_task.monitor.get_process())
543 self.add_agent_task(agent_task)
544
545 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000546
547
showard8cc058f2009-09-08 16:26:33 +0000548 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000549 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
550 % status):
showard0db3d432009-10-12 20:29:15 +0000551 if entry.status == status and not self.get_agents_for_entry(entry):
552 # The status can change during iteration, e.g., if job.run()
553 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000554 yield entry
555
556
showard6878e8b2009-07-20 22:37:45 +0000557 def _check_for_remaining_orphan_processes(self, orphans):
558 if not orphans:
559 return
560 subject = 'Unrecovered orphan autoserv processes remain'
561 message = '\n'.join(str(process) for process in orphans)
562 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000563
564 die_on_orphans = global_config.global_config.get_config_value(
565 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
566
567 if die_on_orphans:
568 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000569
showard170873e2009-01-07 00:22:26 +0000570
showard8cc058f2009-09-08 16:26:33 +0000571 def _recover_pending_entries(self):
572 for entry in self._get_unassigned_entries(
573 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000574 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000575 entry.on_pending()
576
577
showardb8900452009-10-12 20:31:01 +0000578 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000579 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000580 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
581 unrecovered_hqes = []
582 for queue_entry in queue_entries:
583 special_tasks = models.SpecialTask.objects.filter(
584 task__in=(models.SpecialTask.Task.CLEANUP,
585 models.SpecialTask.Task.VERIFY),
586 queue_entry__id=queue_entry.id,
587 is_complete=False)
588 if special_tasks.count() == 0:
589 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000590
showardb8900452009-10-12 20:31:01 +0000591 if unrecovered_hqes:
592 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800593 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000594 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000595 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000596
597
showard65db3932009-10-28 19:54:35 +0000598 def _get_prioritized_special_tasks(self):
599 """
600 Returns all queued SpecialTasks prioritized for repair first, then
601 cleanup, then verify.
602 """
603 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
604 is_complete=False,
605 host__locked=False)
606 # exclude hosts with active queue entries unless the SpecialTask is for
607 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000608 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000609 queued_tasks, 'afe_host_queue_entries', 'host_id',
610 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000611 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000612 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000613 where=['(afe_host_queue_entries.id IS NULL OR '
614 'afe_host_queue_entries.id = '
615 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000616
showard65db3932009-10-28 19:54:35 +0000617 # reorder tasks by priority
618 task_priority_order = [models.SpecialTask.Task.REPAIR,
619 models.SpecialTask.Task.CLEANUP,
620 models.SpecialTask.Task.VERIFY]
621 def task_priority_key(task):
622 return task_priority_order.index(task.task)
623 return sorted(queued_tasks, key=task_priority_key)
624
625
showard65db3932009-10-28 19:54:35 +0000626 def _schedule_special_tasks(self):
627 """
628 Execute queued SpecialTasks that are ready to run on idle hosts.
629 """
630 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000631 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000632 continue
showardd1195652009-12-08 22:21:02 +0000633 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000634
635
showard170873e2009-01-07 00:22:26 +0000636 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000637 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000638 # should never happen
showarded2afea2009-07-07 20:54:07 +0000639 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000640 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000641 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000642 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000643 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000644
645
jadmanski0afbb632008-06-06 21:10:57 +0000646 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000647 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700648 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000649 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000650 if self.host_has_agent(host):
651 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000652 continue
showard8cc058f2009-09-08 16:26:33 +0000653 if self._host_has_scheduled_special_task(host):
654 # host will have a special task scheduled on the next cycle
655 continue
showard170873e2009-01-07 00:22:26 +0000656 if print_message:
showardb18134f2009-03-20 20:52:18 +0000657 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000658 models.SpecialTask.objects.create(
659 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000660 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000661
662
jadmanski0afbb632008-06-06 21:10:57 +0000663 def _recover_hosts(self):
664 # recover "Repair Failed" hosts
665 message = 'Reverifying dead host %s'
666 self._reverify_hosts_where("status = 'Repair Failed'",
667 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000668
669
showard04c82c52008-05-29 19:38:12 +0000670
showardb95b1bd2008-08-15 18:11:04 +0000671 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000672 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +0000673 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000674 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000675 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +0000676 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000677
678
showard89f84db2009-03-12 20:39:13 +0000679 def _refresh_pending_queue_entries(self):
680 """
681 Lookup the pending HostQueueEntries and call our HostScheduler
682 refresh() method given that list. Return the list.
683
684 @returns A list of pending HostQueueEntries sorted in priority order.
685 """
showard63a34772008-08-18 19:32:50 +0000686 queue_entries = self._get_pending_queue_entries()
687 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000688 return []
showardb95b1bd2008-08-15 18:11:04 +0000689
showard63a34772008-08-18 19:32:50 +0000690 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000691
showard89f84db2009-03-12 20:39:13 +0000692 return queue_entries
693
694
695 def _schedule_atomic_group(self, queue_entry):
696 """
697 Schedule the given queue_entry on an atomic group of hosts.
698
699 Returns immediately if there are insufficient available hosts.
700
701 Creates new HostQueueEntries based off of queue_entry for the
702 scheduled hosts and starts them all running.
703 """
704 # This is a virtual host queue entry representing an entire
705 # atomic group, find a group and schedule their hosts.
706 group_hosts = self._host_scheduler.find_eligible_atomic_group(
707 queue_entry)
708 if not group_hosts:
709 return
showardcbe6f942009-06-17 19:33:49 +0000710
711 logging.info('Expanding atomic group entry %s with hosts %s',
712 queue_entry,
713 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000714
showard89f84db2009-03-12 20:39:13 +0000715 for assigned_host in group_hosts[1:]:
716 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000717 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000718 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000719 new_hqe.set_host(assigned_host)
720 self._run_queue_entry(new_hqe)
721
722 # The first assigned host uses the original HostQueueEntry
723 queue_entry.set_host(group_hosts[0])
724 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000725
726
showarda9545c02009-12-18 22:44:26 +0000727 def _schedule_hostless_job(self, queue_entry):
728 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000729 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000730
731
showard89f84db2009-03-12 20:39:13 +0000732 def _schedule_new_jobs(self):
733 queue_entries = self._refresh_pending_queue_entries()
734 if not queue_entries:
735 return
736
Simran Basi3f6717d2012-09-13 15:21:22 -0700737 logging.debug('Processing %d queue_entries', len(queue_entries))
showard63a34772008-08-18 19:32:50 +0000738 for queue_entry in queue_entries:
Simran Basidef92872012-09-20 13:34:34 -0700739 self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
showarde55955f2009-10-07 20:48:58 +0000740 is_unassigned_atomic_group = (
741 queue_entry.atomic_group_id is not None
742 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +0000743
744 if queue_entry.is_hostless():
Simran Basidef92872012-09-20 13:34:34 -0700745 self._log_extra_msg('Scheduling hostless job.')
showarda9545c02009-12-18 22:44:26 +0000746 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000747 elif is_unassigned_atomic_group:
748 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +0000749 else:
jamesren883492a2010-02-12 00:45:18 +0000750 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +0000751 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +0000752 assert assigned_host.id == queue_entry.host_id
753 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000754
755
showard8cc058f2009-09-08 16:26:33 +0000756 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +0000757 for agent_task in self._get_queue_entry_agent_tasks():
758 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000759
760
761 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000762 for entry in scheduler_models.HostQueueEntry.fetch(
763 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000764 task = entry.job.schedule_delayed_callback_task(entry)
765 if task:
showardd1195652009-12-08 22:21:02 +0000766 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000767
768
jamesren883492a2010-02-12 00:45:18 +0000769 def _run_queue_entry(self, queue_entry):
Simran Basidef92872012-09-20 13:34:34 -0700770 self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
771 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000772 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000773
774
jadmanski0afbb632008-06-06 21:10:57 +0000775 def _find_aborting(self):
jamesrene7c65cb2010-06-08 20:38:10 +0000776 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000777 for entry in scheduler_models.HostQueueEntry.fetch(
778 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +0000779 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +0000780 for agent in self.get_agents_for_entry(entry):
781 agent.abort()
782 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000783 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700784 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000785 for job in jobs_to_stop:
786 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000787
788
showard324bf812009-01-20 23:23:38 +0000789 def _can_start_agent(self, agent, num_started_this_cycle,
790 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000791 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000792 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000793 return True
794 # don't allow any nonzero-process agents to run after we've reached a
795 # limit (this avoids starvation of many-process agents)
796 if have_reached_limit:
797 return False
798 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000799 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000800 agent.task.owner_username,
801 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000802 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000803 return False
804 # if a single agent exceeds the per-cycle throttling, still allow it to
805 # run when it's the first agent in the cycle
806 if num_started_this_cycle == 0:
807 return True
808 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000809 if (num_started_this_cycle + agent.task.num_processes >
810 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000811 return False
812 return True
813
814
jadmanski0afbb632008-06-06 21:10:57 +0000815 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000816 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000817 have_reached_limit = False
818 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700819 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000820 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700821 self._log_extra_msg('Processing Agent with Host Ids: %s and '
822 'queue_entry ids:%s' % (agent.host_ids,
823 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000824 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000825 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000826 have_reached_limit):
827 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700828 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000829 continue
showardd1195652009-12-08 22:21:02 +0000830 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700831 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000832 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700833 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000834 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700835 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000836 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700837 logging.info('%d running processes. %d added this cycle.',
838 _drone_manager.total_running_processes(),
839 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +0000840
841
showard29f7cd22009-04-29 21:16:24 +0000842 def _process_recurring_runs(self):
843 recurring_runs = models.RecurringRun.objects.filter(
844 start_date__lte=datetime.datetime.now())
845 for rrun in recurring_runs:
846 # Create job from template
847 job = rrun.job
848 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000849 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000850
851 host_objects = info['hosts']
852 one_time_hosts = info['one_time_hosts']
853 metahost_objects = info['meta_hosts']
854 dependencies = info['dependencies']
855 atomic_group = info['atomic_group']
856
857 for host in one_time_hosts or []:
858 this_host = models.Host.create_one_time_host(host.hostname)
859 host_objects.append(this_host)
860
861 try:
862 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000863 options=options,
showard29f7cd22009-04-29 21:16:24 +0000864 host_objects=host_objects,
865 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +0000866 atomic_group=atomic_group)
867
868 except Exception, ex:
869 logging.exception(ex)
870 #TODO send email
871
872 if rrun.loop_count == 1:
873 rrun.delete()
874 else:
875 if rrun.loop_count != 0: # if not infinite loop
876 # calculate new start_date
877 difference = datetime.timedelta(seconds=rrun.loop_period)
878 rrun.start_date = rrun.start_date + difference
879 rrun.loop_count -= 1
880 rrun.save()
881
882
Simran Basia858a232012-08-21 11:04:37 -0700883SiteDispatcher = utils.import_site_class(
884 __file__, 'autotest_lib.scheduler.site_monitor_db',
885 'SiteDispatcher', BaseDispatcher)
886
887class Dispatcher(SiteDispatcher):
888 pass
889
890
showard170873e2009-01-07 00:22:26 +0000891class PidfileRunMonitor(object):
892 """
893 Client must call either run() to start a new process or
894 attach_to_existing_process().
895 """
mbligh36768f02008-02-22 18:28:33 +0000896
showard170873e2009-01-07 00:22:26 +0000897 class _PidfileException(Exception):
898 """
899 Raised when there's some unexpected behavior with the pid file, but only
900 used internally (never allowed to escape this class).
901 """
mbligh36768f02008-02-22 18:28:33 +0000902
903
showard170873e2009-01-07 00:22:26 +0000904 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000905 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000906 self._start_time = None
907 self.pidfile_id = None
908 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000909
910
showard170873e2009-01-07 00:22:26 +0000911 def _add_nice_command(self, command, nice_level):
912 if not nice_level:
913 return command
914 return ['nice', '-n', str(nice_level)] + command
915
916
917 def _set_start_time(self):
918 self._start_time = time.time()
919
920
showard418785b2009-11-23 20:19:59 +0000921 def run(self, command, working_directory, num_processes, nice_level=None,
922 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +0000923 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +0000924 assert command is not None
925 if nice_level is not None:
926 command = ['nice', '-n', str(nice_level)] + command
927 self._set_start_time()
928 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +0000929 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +0000930 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +0000931 paired_with_pidfile=paired_with_pidfile, username=username,
932 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +0000933
934
showarded2afea2009-07-07 20:54:07 +0000935 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +0000936 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +0000937 num_processes=None):
showard170873e2009-01-07 00:22:26 +0000938 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +0000939 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000940 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +0000941 if num_processes is not None:
942 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +0000943
944
jadmanski0afbb632008-06-06 21:10:57 +0000945 def kill(self):
showard170873e2009-01-07 00:22:26 +0000946 if self.has_process():
947 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000948
mbligh36768f02008-02-22 18:28:33 +0000949
showard170873e2009-01-07 00:22:26 +0000950 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000951 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000952 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000953
954
showard170873e2009-01-07 00:22:26 +0000955 def get_process(self):
showard21baa452008-10-21 00:08:39 +0000956 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +0000957 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +0000958 return self._state.process
mblighbb421852008-03-11 22:36:16 +0000959
960
showard170873e2009-01-07 00:22:26 +0000961 def _read_pidfile(self, use_second_read=False):
962 assert self.pidfile_id is not None, (
963 'You must call run() or attach_to_existing_process()')
964 contents = _drone_manager.get_pidfile_contents(
965 self.pidfile_id, use_second_read=use_second_read)
966 if contents.is_invalid():
967 self._state = drone_manager.PidfileContents()
968 raise self._PidfileException(contents)
969 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +0000970
971
showard21baa452008-10-21 00:08:39 +0000972 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +0000973 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
974 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +0000975 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +0000976 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +0000977
978
979 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +0000980 if self.lost_process:
showard21baa452008-10-21 00:08:39 +0000981 return
mblighbb421852008-03-11 22:36:16 +0000982
showard21baa452008-10-21 00:08:39 +0000983 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +0000984
showard170873e2009-01-07 00:22:26 +0000985 if self._state.process is None:
986 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +0000987 return
mbligh90a549d2008-03-25 23:52:34 +0000988
showard21baa452008-10-21 00:08:39 +0000989 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000990 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +0000991 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +0000992 return
mbligh90a549d2008-03-25 23:52:34 +0000993
showard170873e2009-01-07 00:22:26 +0000994 # pid but no running process - maybe process *just* exited
995 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +0000996 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000997 # autoserv exited without writing an exit code
998 # to the pidfile
showard21baa452008-10-21 00:08:39 +0000999 self._handle_pidfile_error(
1000 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001001
showard21baa452008-10-21 00:08:39 +00001002
1003 def _get_pidfile_info(self):
1004 """\
1005 After completion, self._state will contain:
1006 pid=None, exit_status=None if autoserv has not yet run
1007 pid!=None, exit_status=None if autoserv is running
1008 pid!=None, exit_status!=None if autoserv has completed
1009 """
1010 try:
1011 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001012 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001013 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001014
1015
showard170873e2009-01-07 00:22:26 +00001016 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001017 """\
1018 Called when no pidfile is found or no pid is in the pidfile.
1019 """
showard170873e2009-01-07 00:22:26 +00001020 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001021 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001022 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001023 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001024 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001025
1026
showard35162b02009-03-03 02:17:30 +00001027 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001028 """\
1029 Called when autoserv has exited without writing an exit status,
1030 or we've timed out waiting for autoserv to write a pid to the
1031 pidfile. In either case, we just return failure and the caller
1032 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001033
showard170873e2009-01-07 00:22:26 +00001034 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001035 """
1036 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001037 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001038 self._state.exit_status = 1
1039 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001040
1041
jadmanski0afbb632008-06-06 21:10:57 +00001042 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001043 self._get_pidfile_info()
1044 return self._state.exit_status
1045
1046
1047 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001048 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001049 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001050 if self._state.num_tests_failed is None:
1051 return -1
showard21baa452008-10-21 00:08:39 +00001052 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001053
1054
showardcdaeae82009-08-31 18:32:48 +00001055 def try_copy_results_on_drone(self, **kwargs):
1056 if self.has_process():
1057 # copy results logs into the normal place for job results
1058 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1059
1060
1061 def try_copy_to_results_repository(self, source, **kwargs):
1062 if self.has_process():
1063 _drone_manager.copy_to_results_repository(self.get_process(),
1064 source, **kwargs)
1065
1066
mbligh36768f02008-02-22 18:28:33 +00001067class Agent(object):
showard77182562009-06-10 00:16:05 +00001068 """
showard8cc058f2009-09-08 16:26:33 +00001069 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001070
1071 The following methods are required on all task objects:
1072 poll() - Called periodically to let the task check its status and
1073 update its internal state. If the task succeeded.
1074 is_done() - Returns True if the task is finished.
1075 abort() - Called when an abort has been requested. The task must
1076 set its aborted attribute to True if it actually aborted.
1077
1078 The following attributes are required on all task objects:
1079 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001080 success - bool, True if this task succeeded.
1081 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1082 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001083 """
1084
1085
showard418785b2009-11-23 20:19:59 +00001086 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001087 """
showard8cc058f2009-09-08 16:26:33 +00001088 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001089 """
showard8cc058f2009-09-08 16:26:33 +00001090 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001091
showard77182562009-06-10 00:16:05 +00001092 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001093 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001094
showard8cc058f2009-09-08 16:26:33 +00001095 self.queue_entry_ids = task.queue_entry_ids
1096 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001097
showard8cc058f2009-09-08 16:26:33 +00001098 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001099 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001100
1101
jadmanski0afbb632008-06-06 21:10:57 +00001102 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001103 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001104 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001105 self.task.poll()
1106 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001107 self.finished = True
showardec113162008-05-08 00:52:49 +00001108
1109
jadmanski0afbb632008-06-06 21:10:57 +00001110 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001111 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001112
1113
showardd3dc1992009-04-22 21:01:40 +00001114 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001115 if self.task:
1116 self.task.abort()
1117 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001118 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001119 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001120
showardd3dc1992009-04-22 21:01:40 +00001121
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001122class BaseAgentTask(object):
showardd1195652009-12-08 22:21:02 +00001123 class _NullMonitor(object):
1124 pidfile_id = None
1125
1126 def has_process(self):
1127 return True
1128
1129
1130 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001131 """
showardd1195652009-12-08 22:21:02 +00001132 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001133 """
jadmanski0afbb632008-06-06 21:10:57 +00001134 self.done = False
showardd1195652009-12-08 22:21:02 +00001135 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001136 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001137 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001138 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001139 self.queue_entry_ids = []
1140 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001141 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001142
1143
1144 def _set_ids(self, host=None, queue_entries=None):
1145 if queue_entries and queue_entries != [None]:
1146 self.host_ids = [entry.host.id for entry in queue_entries]
1147 self.queue_entry_ids = [entry.id for entry in queue_entries]
1148 else:
1149 assert host
1150 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001151
1152
jadmanski0afbb632008-06-06 21:10:57 +00001153 def poll(self):
showard08a36412009-05-05 01:01:13 +00001154 if not self.started:
1155 self.start()
showardd1195652009-12-08 22:21:02 +00001156 if not self.done:
1157 self.tick()
showard08a36412009-05-05 01:01:13 +00001158
1159
1160 def tick(self):
showardd1195652009-12-08 22:21:02 +00001161 assert self.monitor
1162 exit_code = self.monitor.exit_code()
1163 if exit_code is None:
1164 return
mbligh36768f02008-02-22 18:28:33 +00001165
showardd1195652009-12-08 22:21:02 +00001166 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001167 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001168
1169
jadmanski0afbb632008-06-06 21:10:57 +00001170 def is_done(self):
1171 return self.done
mbligh36768f02008-02-22 18:28:33 +00001172
1173
jadmanski0afbb632008-06-06 21:10:57 +00001174 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001175 if self.done:
showardd1195652009-12-08 22:21:02 +00001176 assert self.started
showard08a36412009-05-05 01:01:13 +00001177 return
showardd1195652009-12-08 22:21:02 +00001178 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001179 self.done = True
1180 self.success = success
1181 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001182
1183
jadmanski0afbb632008-06-06 21:10:57 +00001184 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001185 """
1186 To be overridden.
1187 """
showarded2afea2009-07-07 20:54:07 +00001188 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001189 self.register_necessary_pidfiles()
1190
1191
1192 def _log_file(self):
1193 if not self._log_file_name:
1194 return None
1195 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001196
mbligh36768f02008-02-22 18:28:33 +00001197
jadmanski0afbb632008-06-06 21:10:57 +00001198 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001199 log_file = self._log_file()
1200 if self.monitor and log_file:
1201 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001202
1203
jadmanski0afbb632008-06-06 21:10:57 +00001204 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001205 """
1206 To be overridden.
1207 """
jadmanski0afbb632008-06-06 21:10:57 +00001208 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001209 logging.info("%s finished with success=%s", type(self).__name__,
1210 self.success)
1211
mbligh36768f02008-02-22 18:28:33 +00001212
1213
jadmanski0afbb632008-06-06 21:10:57 +00001214 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001215 if not self.started:
1216 self.prolog()
1217 self.run()
1218
1219 self.started = True
1220
1221
1222 def abort(self):
1223 if self.monitor:
1224 self.monitor.kill()
1225 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001226 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001227 self.cleanup()
1228
1229
showarded2afea2009-07-07 20:54:07 +00001230 def _get_consistent_execution_path(self, execution_entries):
1231 first_execution_path = execution_entries[0].execution_path()
1232 for execution_entry in execution_entries[1:]:
1233 assert execution_entry.execution_path() == first_execution_path, (
1234 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1235 execution_entry,
1236 first_execution_path,
1237 execution_entries[0]))
1238 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001239
1240
showarded2afea2009-07-07 20:54:07 +00001241 def _copy_results(self, execution_entries, use_monitor=None):
1242 """
1243 @param execution_entries: list of objects with execution_path() method
1244 """
showard6d1c1432009-08-20 23:30:39 +00001245 if use_monitor is not None and not use_monitor.has_process():
1246 return
1247
showarded2afea2009-07-07 20:54:07 +00001248 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001249 if use_monitor is None:
1250 assert self.monitor
1251 use_monitor = self.monitor
1252 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001253 execution_path = self._get_consistent_execution_path(execution_entries)
1254 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001255 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001256
showarda1e74b32009-05-12 17:32:04 +00001257
1258 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001259 for queue_entry in queue_entries:
1260 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001261
1262
mbligh4608b002010-01-05 18:22:35 +00001263 def _archive_results(self, queue_entries):
1264 for queue_entry in queue_entries:
1265 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001266
1267
showardd1195652009-12-08 22:21:02 +00001268 def _command_line(self):
1269 """
1270 Return the command line to run. Must be overridden.
1271 """
1272 raise NotImplementedError
1273
1274
1275 @property
1276 def num_processes(self):
1277 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001278 Return the number of processes forked by this BaseAgentTask's process.
1279 It may only be approximate. To be overridden if necessary.
showardd1195652009-12-08 22:21:02 +00001280 """
1281 return 1
1282
1283
1284 def _paired_with_monitor(self):
1285 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001286 If this BaseAgentTask's process must run on the same machine as some
showardd1195652009-12-08 22:21:02 +00001287 previous process, this method should be overridden to return a
1288 PidfileRunMonitor for that process.
1289 """
1290 return self._NullMonitor()
1291
1292
1293 @property
1294 def owner_username(self):
1295 """
1296 Return login of user responsible for this task. May be None. Must be
1297 overridden.
1298 """
1299 raise NotImplementedError
1300
1301
1302 def _working_directory(self):
1303 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001304 Return the directory where this BaseAgentTask's process executes.
1305 Must be overridden.
showardd1195652009-12-08 22:21:02 +00001306 """
1307 raise NotImplementedError
1308
1309
1310 def _pidfile_name(self):
1311 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001312 Return the name of the pidfile this BaseAgentTask's process uses. To be
showardd1195652009-12-08 22:21:02 +00001313 overridden if necessary.
1314 """
jamesrenc44ae992010-02-19 00:12:54 +00001315 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001316
1317
1318 def _check_paired_results_exist(self):
1319 if not self._paired_with_monitor().has_process():
1320 email_manager.manager.enqueue_notify_email(
1321 'No paired results in task',
1322 'No paired results in task %s at %s'
1323 % (self, self._paired_with_monitor().pidfile_id))
1324 self.finished(False)
1325 return False
1326 return True
1327
1328
1329 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001330 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001331 self.monitor = PidfileRunMonitor()
1332
1333
1334 def run(self):
1335 if not self._check_paired_results_exist():
1336 return
1337
1338 self._create_monitor()
1339 self.monitor.run(
1340 self._command_line(), self._working_directory(),
1341 num_processes=self.num_processes,
1342 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1343 pidfile_name=self._pidfile_name(),
1344 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001345 username=self.owner_username,
1346 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1347
1348
1349 def get_drone_hostnames_allowed(self):
1350 if not models.DroneSet.drone_sets_enabled():
1351 return None
1352
1353 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1354 if not hqes:
1355 # Only special tasks could be missing host queue entries
1356 assert isinstance(self, SpecialAgentTask)
1357 return self._user_or_global_default_drone_set(
1358 self.task, self.task.requested_by)
1359
1360 job_ids = hqes.values_list('job', flat=True).distinct()
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001361 assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
jamesren76fcf192010-04-21 20:39:50 +00001362 "span multiple jobs")
1363
1364 job = models.Job.objects.get(id=job_ids[0])
1365 drone_set = job.drone_set
1366 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001367 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001368
1369 return drone_set.get_drone_hostnames()
1370
1371
1372 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1373 """
1374 Returns the user's default drone set, if present.
1375
1376 Otherwise, returns the global default drone set.
1377 """
1378 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1379 if not user:
1380 logging.warn('%s had no owner; using default drone set',
1381 obj_with_owner)
1382 return default_hostnames
1383 if not user.drone_set:
1384 logging.warn('User %s has no default drone set, using global '
1385 'default', user.login)
1386 return default_hostnames
1387 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001388
1389
1390 def register_necessary_pidfiles(self):
1391 pidfile_id = _drone_manager.get_pidfile_id_from(
1392 self._working_directory(), self._pidfile_name())
1393 _drone_manager.register_pidfile(pidfile_id)
1394
1395 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1396 if paired_pidfile_id:
1397 _drone_manager.register_pidfile(paired_pidfile_id)
1398
1399
1400 def recover(self):
1401 if not self._check_paired_results_exist():
1402 return
1403
1404 self._create_monitor()
1405 self.monitor.attach_to_existing_process(
1406 self._working_directory(), pidfile_name=self._pidfile_name(),
1407 num_processes=self.num_processes)
1408 if not self.monitor.has_process():
1409 # no process to recover; wait to be started normally
1410 self.monitor = None
1411 return
1412
1413 self.started = True
Aviv Keshet225bdfe2013-03-05 10:10:08 -08001414 logging.info('Recovering process %s for %s at %s',
1415 self.monitor.get_process(), type(self).__name__,
1416 self._working_directory())
mbligh36768f02008-02-22 18:28:33 +00001417
1418
mbligh4608b002010-01-05 18:22:35 +00001419 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1420 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001421 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001422 for entry in queue_entries:
1423 if entry.status not in allowed_hqe_statuses:
Dale Curtisaa513362011-03-01 17:27:44 -08001424 raise host_scheduler.SchedulerError(
1425 '%s attempting to start entry with invalid status %s: '
1426 '%s' % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001427 invalid_host_status = (
1428 allowed_host_statuses is not None
1429 and entry.host.status not in allowed_host_statuses)
1430 if invalid_host_status:
Dale Curtisaa513362011-03-01 17:27:44 -08001431 raise host_scheduler.SchedulerError(
1432 '%s attempting to start on queue entry with invalid '
1433 'host status %s: %s'
1434 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001435
1436
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001437SiteAgentTask = utils.import_site_class(
1438 __file__, 'autotest_lib.scheduler.site_monitor_db',
1439 'SiteAgentTask', BaseAgentTask)
1440
1441class AgentTask(SiteAgentTask):
1442 pass
1443
1444
showardd9205182009-04-27 20:09:55 +00001445class TaskWithJobKeyvals(object):
1446 """AgentTask mixin providing functionality to help with job keyval files."""
1447 _KEYVAL_FILE = 'keyval'
1448 def _format_keyval(self, key, value):
1449 return '%s=%s' % (key, value)
1450
1451
1452 def _keyval_path(self):
1453 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001454 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001455
1456
1457 def _write_keyval_after_job(self, field, value):
1458 assert self.monitor
1459 if not self.monitor.has_process():
1460 return
1461 _drone_manager.write_lines_to_file(
1462 self._keyval_path(), [self._format_keyval(field, value)],
1463 paired_with_process=self.monitor.get_process())
1464
1465
1466 def _job_queued_keyval(self, job):
1467 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1468
1469
1470 def _write_job_finished(self):
1471 self._write_keyval_after_job("job_finished", int(time.time()))
1472
1473
showarddb502762009-09-09 15:31:20 +00001474 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1475 keyval_contents = '\n'.join(self._format_keyval(key, value)
1476 for key, value in keyval_dict.iteritems())
1477 # always end with a newline to allow additional keyvals to be written
1478 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001479 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001480 keyval_contents,
1481 file_path=keyval_path)
1482
1483
1484 def _write_keyvals_before_job(self, keyval_dict):
1485 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1486
1487
1488 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001489 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001490 host.hostname)
1491 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001492 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001493 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1494 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1495
1496
showard8cc058f2009-09-08 16:26:33 +00001497class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001498 """
1499 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1500 """
1501
1502 TASK_TYPE = None
1503 host = None
1504 queue_entry = None
1505
showardd1195652009-12-08 22:21:02 +00001506 def __init__(self, task, extra_command_args):
1507 super(SpecialAgentTask, self).__init__()
1508
lmrb7c5d272010-04-16 06:34:04 +00001509 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001510
jamesrenc44ae992010-02-19 00:12:54 +00001511 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001512 self.queue_entry = None
1513 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001514 self.queue_entry = scheduler_models.HostQueueEntry(
1515 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001516
showarded2afea2009-07-07 20:54:07 +00001517 self.task = task
1518 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001519
1520
showard8cc058f2009-09-08 16:26:33 +00001521 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001522 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1523
1524
1525 def _command_line(self):
1526 return _autoserv_command_line(self.host.hostname,
1527 self._extra_command_args,
1528 queue_entry=self.queue_entry)
1529
1530
1531 def _working_directory(self):
1532 return self.task.execution_path()
1533
1534
1535 @property
1536 def owner_username(self):
1537 if self.task.requested_by:
1538 return self.task.requested_by.login
1539 return None
showard8cc058f2009-09-08 16:26:33 +00001540
1541
showarded2afea2009-07-07 20:54:07 +00001542 def prolog(self):
1543 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001544 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001545 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001546
1547
showardde634ee2009-01-30 01:44:24 +00001548 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001549 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001550
showard2fe3f1d2009-07-06 20:19:11 +00001551 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001552 return # don't fail metahost entries, they'll be reassigned
1553
showard2fe3f1d2009-07-06 20:19:11 +00001554 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001555 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001556 return # entry has been aborted
1557
showard2fe3f1d2009-07-06 20:19:11 +00001558 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001559 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001560 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001561 self._write_keyval_after_job(queued_key, queued_time)
1562 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001563
showard8cc058f2009-09-08 16:26:33 +00001564 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001565 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001566 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001567 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001568
showard8cc058f2009-09-08 16:26:33 +00001569 pidfile_id = _drone_manager.get_pidfile_id_from(
1570 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001571 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001572 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001573
1574 if self.queue_entry.job.parse_failed_repair:
1575 self._parse_results([self.queue_entry])
1576 else:
1577 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001578
1579
1580 def cleanup(self):
1581 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001582
1583 # We will consider an aborted task to be "Failed"
1584 self.task.finish(bool(self.success))
1585
showardf85a0b72009-10-07 20:48:45 +00001586 if self.monitor:
1587 if self.monitor.has_process():
1588 self._copy_results([self.task])
1589 if self.monitor.pidfile_id is not None:
1590 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001591
1592
1593class RepairTask(SpecialAgentTask):
1594 TASK_TYPE = models.SpecialTask.Task.REPAIR
1595
1596
showardd1195652009-12-08 22:21:02 +00001597 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001598 """\
1599 queue_entry: queue entry to mark failed if this repair fails.
1600 """
1601 protection = host_protections.Protection.get_string(
1602 task.host.protection)
1603 # normalize the protection name
1604 protection = host_protections.Protection.get_attr_name(protection)
1605
1606 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001607 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001608
1609 # *don't* include the queue entry in IDs -- if the queue entry is
1610 # aborted, we want to leave the repair task running
1611 self._set_ids(host=self.host)
1612
1613
1614 def prolog(self):
1615 super(RepairTask, self).prolog()
1616 logging.info("repair_task starting")
1617 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001618
1619
jadmanski0afbb632008-06-06 21:10:57 +00001620 def epilog(self):
1621 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001622
jadmanski0afbb632008-06-06 21:10:57 +00001623 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001624 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001625 else:
showard8cc058f2009-09-08 16:26:33 +00001626 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001627 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001628 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001629
1630
showarded2afea2009-07-07 20:54:07 +00001631class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001632 def _copy_to_results_repository(self):
1633 if not self.queue_entry or self.queue_entry.meta_host:
1634 return
1635
1636 self.queue_entry.set_execution_subdir()
1637 log_name = os.path.basename(self.task.execution_path())
1638 source = os.path.join(self.task.execution_path(), 'debug',
1639 'autoserv.DEBUG')
1640 destination = os.path.join(
1641 self.queue_entry.execution_path(), log_name)
1642
1643 self.monitor.try_copy_to_results_repository(
1644 source, destination_path=destination)
1645
1646
showard170873e2009-01-07 00:22:26 +00001647 def epilog(self):
1648 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001649
showard775300b2009-09-09 15:30:50 +00001650 if self.success:
1651 return
showard8fe93b52008-11-18 17:53:22 +00001652
showard775300b2009-09-09 15:30:50 +00001653 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001654
showard775300b2009-09-09 15:30:50 +00001655 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001656 # effectively ignore failure for these hosts
1657 self.success = True
showard775300b2009-09-09 15:30:50 +00001658 return
1659
1660 if self.queue_entry:
1661 self.queue_entry.requeue()
1662
1663 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001664 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001665 queue_entry__id=self.queue_entry.id):
1666 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1667 self._fail_queue_entry()
1668 return
1669
showard9bb960b2009-11-19 01:02:11 +00001670 queue_entry = models.HostQueueEntry.objects.get(
1671 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001672 else:
1673 queue_entry = None
1674
1675 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001676 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001677 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001678 queue_entry=queue_entry,
1679 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001680
showard8fe93b52008-11-18 17:53:22 +00001681
1682class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001683 TASK_TYPE = models.SpecialTask.Task.VERIFY
1684
1685
showardd1195652009-12-08 22:21:02 +00001686 def __init__(self, task):
1687 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001688 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001689
1690
jadmanski0afbb632008-06-06 21:10:57 +00001691 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001692 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001693
showardb18134f2009-03-20 20:52:18 +00001694 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001695 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001696 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1697 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001698
jamesren42318f72010-05-10 23:40:59 +00001699 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00001700 # and there's no need to keep records of other requests.
1701 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001702 host__id=self.host.id,
1703 task=models.SpecialTask.Task.VERIFY,
jamesren42318f72010-05-10 23:40:59 +00001704 is_active=False, is_complete=False, queue_entry=None)
showarded2afea2009-07-07 20:54:07 +00001705 queued_verifies = queued_verifies.exclude(id=self.task.id)
1706 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001707
mbligh36768f02008-02-22 18:28:33 +00001708
jadmanski0afbb632008-06-06 21:10:57 +00001709 def epilog(self):
1710 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001711 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001712 if self.queue_entry:
1713 self.queue_entry.on_pending()
1714 else:
1715 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001716
1717
mbligh4608b002010-01-05 18:22:35 +00001718class CleanupTask(PreJobTask):
1719 # note this can also run post-job, but when it does, it's running standalone
1720 # against the host (not related to the job), so it's not considered a
1721 # PostJobTask
1722
1723 TASK_TYPE = models.SpecialTask.Task.CLEANUP
1724
1725
1726 def __init__(self, task, recover_run_monitor=None):
1727 super(CleanupTask, self).__init__(task, ['--cleanup'])
1728 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1729
1730
1731 def prolog(self):
1732 super(CleanupTask, self).prolog()
1733 logging.info("starting cleanup task for host: %s", self.host.hostname)
1734 self.host.set_status(models.Host.Status.CLEANING)
1735 if self.queue_entry:
1736 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1737
1738
1739 def _finish_epilog(self):
1740 if not self.queue_entry or not self.success:
1741 return
1742
1743 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
1744 should_run_verify = (
1745 self.queue_entry.job.run_verify
1746 and self.host.protection != do_not_verify_protection)
1747 if should_run_verify:
1748 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1749 models.SpecialTask.objects.create(
1750 host=models.Host.objects.get(id=self.host.id),
1751 queue_entry=entry,
1752 task=models.SpecialTask.Task.VERIFY)
1753 else:
1754 self.queue_entry.on_pending()
1755
1756
1757 def epilog(self):
1758 super(CleanupTask, self).epilog()
1759
1760 if self.success:
1761 self.host.update_field('dirty', 0)
1762 self.host.set_status(models.Host.Status.READY)
1763
1764 self._finish_epilog()
1765
1766
showarda9545c02009-12-18 22:44:26 +00001767class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
1768 """
1769 Common functionality for QueueTask and HostlessQueueTask
1770 """
1771 def __init__(self, queue_entries):
1772 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001773 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001774 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001775
1776
showard73ec0442009-02-07 02:05:20 +00001777 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001778 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001779
1780
jamesrenc44ae992010-02-19 00:12:54 +00001781 def _write_control_file(self, execution_path):
1782 control_path = _drone_manager.attach_file_to_execution(
1783 execution_path, self.job.control_file)
1784 return control_path
1785
1786
showardd1195652009-12-08 22:21:02 +00001787 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001788 execution_path = self.queue_entries[0].execution_path()
1789 control_path = self._write_control_file(execution_path)
1790 hostnames = ','.join(entry.host.hostname
1791 for entry in self.queue_entries
1792 if not entry.is_hostless())
1793
1794 execution_tag = self.queue_entries[0].execution_tag()
1795 params = _autoserv_command_line(
1796 hostnames,
1797 ['-P', execution_tag, '-n',
1798 _drone_manager.absolute_path(control_path)],
1799 job=self.job, verbose=False)
1800
1801 if not self.job.is_server_job():
1802 params.append('-c')
1803
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001804 if self.job.is_image_update_job():
1805 params += ['--image', self.job.update_image_path]
1806
jamesrenc44ae992010-02-19 00:12:54 +00001807 return params
showardd1195652009-12-08 22:21:02 +00001808
1809
1810 @property
1811 def num_processes(self):
1812 return len(self.queue_entries)
1813
1814
1815 @property
1816 def owner_username(self):
1817 return self.job.owner
1818
1819
1820 def _working_directory(self):
1821 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001822
1823
jadmanski0afbb632008-06-06 21:10:57 +00001824 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001825 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001826 keyval_dict = self.job.keyval_dict()
1827 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001828 group_name = self.queue_entries[0].get_group_name()
1829 if group_name:
1830 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001831 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001832 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001833 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001834 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001835
1836
showard35162b02009-03-03 02:17:30 +00001837 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001838 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001839 _drone_manager.write_lines_to_file(error_file_path,
1840 [_LOST_PROCESS_ERROR])
1841
1842
showardd3dc1992009-04-22 21:01:40 +00001843 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001844 if not self.monitor:
1845 return
1846
showardd9205182009-04-27 20:09:55 +00001847 self._write_job_finished()
1848
showard35162b02009-03-03 02:17:30 +00001849 if self.monitor.lost_process:
1850 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001851
jadmanskif7fa2cc2008-10-01 14:13:23 +00001852
showardcbd74612008-11-19 21:42:02 +00001853 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001854 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001855 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001856 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001857 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001858
1859
jadmanskif7fa2cc2008-10-01 14:13:23 +00001860 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001861 if not self.monitor or not self.monitor.has_process():
1862 return
1863
jadmanskif7fa2cc2008-10-01 14:13:23 +00001864 # build up sets of all the aborted_by and aborted_on values
1865 aborted_by, aborted_on = set(), set()
1866 for queue_entry in self.queue_entries:
1867 if queue_entry.aborted_by:
1868 aborted_by.add(queue_entry.aborted_by)
1869 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1870 aborted_on.add(t)
1871
1872 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001873 # TODO(showard): this conditional is now obsolete, we just need to leave
1874 # it in temporarily for backwards compatibility over upgrades. delete
1875 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001876 assert len(aborted_by) <= 1
1877 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001878 aborted_by_value = aborted_by.pop()
1879 aborted_on_value = max(aborted_on)
1880 else:
1881 aborted_by_value = 'autotest_system'
1882 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001883
showarda0382352009-02-11 23:36:43 +00001884 self._write_keyval_after_job("aborted_by", aborted_by_value)
1885 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001886
showardcbd74612008-11-19 21:42:02 +00001887 aborted_on_string = str(datetime.datetime.fromtimestamp(
1888 aborted_on_value))
1889 self._write_status_comment('Job aborted by %s on %s' %
1890 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001891
1892
jadmanski0afbb632008-06-06 21:10:57 +00001893 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001894 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001895 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001896 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001897
1898
jadmanski0afbb632008-06-06 21:10:57 +00001899 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001900 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001901 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001902
1903
1904class QueueTask(AbstractQueueTask):
1905 def __init__(self, queue_entries):
1906 super(QueueTask, self).__init__(queue_entries)
1907 self._set_ids(queue_entries=queue_entries)
1908
1909
1910 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001911 self._check_queue_entry_statuses(
1912 self.queue_entries,
1913 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1914 models.HostQueueEntry.Status.RUNNING),
1915 allowed_host_statuses=(models.Host.Status.PENDING,
1916 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001917
1918 super(QueueTask, self).prolog()
1919
1920 for queue_entry in self.queue_entries:
1921 self._write_host_keyvals(queue_entry.host)
1922 queue_entry.host.set_status(models.Host.Status.RUNNING)
1923 queue_entry.host.update_field('dirty', 1)
1924 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1925 # TODO(gps): Remove this if nothing needs it anymore.
1926 # A potential user is: tko/parser
1927 self.job.write_to_machines_file(self.queue_entries[0])
1928
1929
1930 def _finish_task(self):
1931 super(QueueTask, self)._finish_task()
1932
1933 for queue_entry in self.queue_entries:
1934 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001935 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001936
1937
mbligh4608b002010-01-05 18:22:35 +00001938class HostlessQueueTask(AbstractQueueTask):
1939 def __init__(self, queue_entry):
1940 super(HostlessQueueTask, self).__init__([queue_entry])
1941 self.queue_entry_ids = [queue_entry.id]
1942
1943
1944 def prolog(self):
1945 self.queue_entries[0].update_field('execution_subdir', 'hostless')
1946 super(HostlessQueueTask, self).prolog()
1947
1948
mbligh4608b002010-01-05 18:22:35 +00001949 def _finish_task(self):
1950 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00001951 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001952
1953
showardd3dc1992009-04-22 21:01:40 +00001954class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00001955 def __init__(self, queue_entries, log_file_name):
1956 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00001957
showardd1195652009-12-08 22:21:02 +00001958 self.queue_entries = queue_entries
1959
showardd3dc1992009-04-22 21:01:40 +00001960 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00001961 self._autoserv_monitor.attach_to_existing_process(
1962 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00001963
showardd1195652009-12-08 22:21:02 +00001964
1965 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00001966 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00001967 return 'true'
1968 return self._generate_command(
1969 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00001970
1971
1972 def _generate_command(self, results_dir):
1973 raise NotImplementedError('Subclasses must override this')
1974
1975
showardd1195652009-12-08 22:21:02 +00001976 @property
1977 def owner_username(self):
1978 return self.queue_entries[0].job.owner
1979
1980
1981 def _working_directory(self):
1982 return self._get_consistent_execution_path(self.queue_entries)
1983
1984
1985 def _paired_with_monitor(self):
1986 return self._autoserv_monitor
1987
1988
showardd3dc1992009-04-22 21:01:40 +00001989 def _job_was_aborted(self):
1990 was_aborted = None
showardd1195652009-12-08 22:21:02 +00001991 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00001992 queue_entry.update_from_database()
1993 if was_aborted is None: # first queue entry
1994 was_aborted = bool(queue_entry.aborted)
1995 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00001996 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
1997 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00001998 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00001999 'Inconsistent abort state',
2000 'Queue entries have inconsistent abort state:\n' +
2001 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00002002 # don't crash here, just assume true
2003 return True
2004 return was_aborted
2005
2006
showardd1195652009-12-08 22:21:02 +00002007 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002008 if self._job_was_aborted():
2009 return models.HostQueueEntry.Status.ABORTED
2010
2011 # we'll use a PidfileRunMonitor to read the autoserv exit status
2012 if self._autoserv_monitor.exit_code() == 0:
2013 return models.HostQueueEntry.Status.COMPLETED
2014 return models.HostQueueEntry.Status.FAILED
2015
2016
showardd3dc1992009-04-22 21:01:40 +00002017 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002018 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002019 queue_entry.set_status(status)
2020
2021
2022 def abort(self):
2023 # override AgentTask.abort() to avoid killing the process and ending
2024 # the task. post-job tasks continue when the job is aborted.
2025 pass
2026
2027
mbligh4608b002010-01-05 18:22:35 +00002028 def _pidfile_label(self):
2029 # '.autoserv_execute' -> 'autoserv'
2030 return self._pidfile_name()[1:-len('_execute')]
2031
2032
showard9bb960b2009-11-19 01:02:11 +00002033class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002034 """
2035 Task responsible for
2036 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2037 * copying logs to the results repository
2038 * spawning CleanupTasks for hosts, if necessary
2039 * spawning a FinalReparseTask for the job
2040 """
showardd1195652009-12-08 22:21:02 +00002041 def __init__(self, queue_entries, recover_run_monitor=None):
2042 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002043 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002044 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002045 self._set_ids(queue_entries=queue_entries)
2046
2047
2048 def _generate_command(self, results_dir):
2049 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002050 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002051 return [_autoserv_path , '-p',
2052 '--pidfile-label=%s' % self._pidfile_label(),
2053 '--use-existing-results', '--collect-crashinfo',
2054 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002055
2056
showardd1195652009-12-08 22:21:02 +00002057 @property
2058 def num_processes(self):
2059 return len(self.queue_entries)
2060
2061
2062 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002063 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002064
2065
showardd3dc1992009-04-22 21:01:40 +00002066 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002067 self._check_queue_entry_statuses(
2068 self.queue_entries,
2069 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2070 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002071
showardd3dc1992009-04-22 21:01:40 +00002072 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002073
2074
showardd3dc1992009-04-22 21:01:40 +00002075 def epilog(self):
2076 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002077 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002078 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002079
showard9bb960b2009-11-19 01:02:11 +00002080
2081 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002082 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002083 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002084 models.HostQueueEntry.Status.COMPLETED)
2085 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2086 else:
2087 final_success = False
2088 num_tests_failed = 0
2089
showard9bb960b2009-11-19 01:02:11 +00002090 reboot_after = self._job.reboot_after
2091 do_reboot = (
2092 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002093 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002094 or reboot_after == model_attributes.RebootAfter.ALWAYS
2095 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
showard9bb960b2009-11-19 01:02:11 +00002096 and final_success and num_tests_failed == 0))
2097
showardd1195652009-12-08 22:21:02 +00002098 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002099 if do_reboot:
2100 # don't pass the queue entry to the CleanupTask. if the cleanup
2101 # fails, the job doesn't care -- it's over.
2102 models.SpecialTask.objects.create(
2103 host=models.Host.objects.get(id=queue_entry.host.id),
2104 task=models.SpecialTask.Task.CLEANUP,
2105 requested_by=self._job.owner_model())
2106 else:
2107 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002108
2109
showard0bbfc212009-04-29 21:06:13 +00002110 def run(self):
showard597bfd32009-05-08 18:22:50 +00002111 autoserv_exit_code = self._autoserv_monitor.exit_code()
2112 # only run if Autoserv exited due to some signal. if we have no exit
2113 # code, assume something bad (and signal-like) happened.
2114 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002115 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002116 else:
2117 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002118
2119
mbligh4608b002010-01-05 18:22:35 +00002120class SelfThrottledPostJobTask(PostJobTask):
2121 """
2122 Special AgentTask subclass that maintains its own global process limit.
2123 """
2124 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002125
2126
mbligh4608b002010-01-05 18:22:35 +00002127 @classmethod
2128 def _increment_running_processes(cls):
2129 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002130
mblighd5c95802008-03-05 00:33:46 +00002131
mbligh4608b002010-01-05 18:22:35 +00002132 @classmethod
2133 def _decrement_running_processes(cls):
2134 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002135
2136
mbligh4608b002010-01-05 18:22:35 +00002137 @classmethod
2138 def _max_processes(cls):
2139 raise NotImplementedError
2140
2141
2142 @classmethod
2143 def _can_run_new_process(cls):
2144 return cls._num_running_processes < cls._max_processes()
2145
2146
2147 def _process_started(self):
2148 return bool(self.monitor)
2149
2150
2151 def tick(self):
2152 # override tick to keep trying to start until the process count goes
2153 # down and we can, at which point we revert to default behavior
2154 if self._process_started():
2155 super(SelfThrottledPostJobTask, self).tick()
2156 else:
2157 self._try_starting_process()
2158
2159
2160 def run(self):
2161 # override run() to not actually run unless we can
2162 self._try_starting_process()
2163
2164
2165 def _try_starting_process(self):
2166 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002167 return
2168
mbligh4608b002010-01-05 18:22:35 +00002169 # actually run the command
2170 super(SelfThrottledPostJobTask, self).run()
jamesren25663562010-04-27 18:00:55 +00002171 if self._process_started():
2172 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002173
mblighd5c95802008-03-05 00:33:46 +00002174
mbligh4608b002010-01-05 18:22:35 +00002175 def finished(self, success):
2176 super(SelfThrottledPostJobTask, self).finished(success)
2177 if self._process_started():
2178 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002179
showard21baa452008-10-21 00:08:39 +00002180
mbligh4608b002010-01-05 18:22:35 +00002181class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002182 def __init__(self, queue_entries):
2183 super(FinalReparseTask, self).__init__(queue_entries,
2184 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002185 # don't use _set_ids, since we don't want to set the host_ids
2186 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002187
2188
2189 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002190 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002191 results_dir]
2192
2193
2194 @property
2195 def num_processes(self):
2196 return 0 # don't include parser processes in accounting
2197
2198
2199 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002200 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002201
2202
showard97aed502008-11-04 02:01:24 +00002203 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002204 def _max_processes(cls):
2205 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002206
2207
2208 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002209 self._check_queue_entry_statuses(
2210 self.queue_entries,
2211 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002212
showard97aed502008-11-04 02:01:24 +00002213 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002214
2215
2216 def epilog(self):
2217 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002218 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002219
2220
mbligh4608b002010-01-05 18:22:35 +00002221class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002222 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2223
mbligh4608b002010-01-05 18:22:35 +00002224 def __init__(self, queue_entries):
2225 super(ArchiveResultsTask, self).__init__(queue_entries,
2226 log_file_name='.archiving.log')
2227 # don't use _set_ids, since we don't want to set the host_ids
2228 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002229
2230
mbligh4608b002010-01-05 18:22:35 +00002231 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002232 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002233
2234
mbligh4608b002010-01-05 18:22:35 +00002235 def _generate_command(self, results_dir):
2236 return [_autoserv_path , '-p',
2237 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002238 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002239 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2240 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002241
2242
mbligh4608b002010-01-05 18:22:35 +00002243 @classmethod
2244 def _max_processes(cls):
2245 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002246
2247
2248 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002249 self._check_queue_entry_statuses(
2250 self.queue_entries,
2251 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2252
2253 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002254
2255
mbligh4608b002010-01-05 18:22:35 +00002256 def epilog(self):
2257 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002258 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002259 failed_file = os.path.join(self._working_directory(),
2260 self._ARCHIVING_FAILED_FILE)
2261 paired_process = self._paired_with_monitor().get_process()
2262 _drone_manager.write_lines_to_file(
2263 failed_file, ['Archiving failed with exit code %s'
2264 % self.monitor.exit_code()],
2265 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002266 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002267
2268
mbligh36768f02008-02-22 18:28:33 +00002269if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002270 main()