blob: e7205abe4f802805c3faaa41c3f5daa7141fec2d [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard402934a2009-12-21 22:20:47 +00008import common
showardef519212009-05-08 02:29:53 +00009import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
Eric Li6f27d4f2010-09-29 10:55:17 -070010import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback, urllib
showardf13a9e22009-12-18 22:54:09 +000011import itertools, logging, weakref, gc
showard402934a2009-12-21 22:20:47 +000012
mbligh8bcd23a2009-02-03 19:14:06 +000013import MySQLdb
showard402934a2009-12-21 22:20:47 +000014
showard043c62a2009-06-10 19:48:57 +000015from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
showard136e6dc2009-06-10 19:38:49 +000020from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000021from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000022from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000023from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
jamesrendd855242010-03-02 22:23:44 +000024from autotest_lib.frontend.afe import model_attributes
showard170873e2009-01-07 00:22:26 +000025from autotest_lib.scheduler import drone_manager, drones, email_manager
Dale Curtisaa513362011-03-01 17:27:44 -080026from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000027from autotest_lib.scheduler import status_server, scheduler_config
jamesrenc44ae992010-02-19 00:12:54 +000028from autotest_lib.scheduler import scheduler_models
showard549afad2009-08-20 23:33:36 +000029BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
30PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000031
mbligh36768f02008-02-22 18:28:33 +000032RESULTS_DIR = '.'
33AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000034DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000035AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
36
37if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000038 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000039AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
40AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
41
42if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000043 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000044
showard35162b02009-03-03 02:17:30 +000045# error message to leave in results dir when an autoserv process disappears
46# mysteriously
47_LOST_PROCESS_ERROR = """\
48Autoserv failed abnormally during execution for this job, probably due to a
49system error on the Autotest server. Full results may not be available. Sorry.
50"""
51
mbligh6f8bab42008-02-29 22:45:14 +000052_db = None
mbligh36768f02008-02-22 18:28:33 +000053_shutdown = False
showard170873e2009-01-07 00:22:26 +000054_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000055_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000056_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000057
Eric Lie0493a42010-11-15 13:05:43 -080058def _parser_path_default(install_dir):
59 return os.path.join(install_dir, 'tko', 'parse')
60_parser_path_func = utils.import_site_function(
61 __file__, 'autotest_lib.scheduler.site_monitor_db',
62 'parser_path', _parser_path_default)
63_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
64
mbligh36768f02008-02-22 18:28:33 +000065
showardec6a3b92009-09-25 20:29:13 +000066def _get_pidfile_timeout_secs():
67 """@returns How long to wait for autoserv to write pidfile."""
68 pidfile_timeout_mins = global_config.global_config.get_config_value(
69 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
70 return pidfile_timeout_mins * 60
71
72
mbligh83c1e9e2009-05-01 23:10:41 +000073def _site_init_monitor_db_dummy():
74 return {}
75
76
jamesren76fcf192010-04-21 20:39:50 +000077def _verify_default_drone_set_exists():
78 if (models.DroneSet.drone_sets_enabled() and
79 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080080 raise host_scheduler.SchedulerError(
81 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000082
83
84def _sanity_check():
85 """Make sure the configs are consistent before starting the scheduler"""
86 _verify_default_drone_set_exists()
87
88
mbligh36768f02008-02-22 18:28:33 +000089def main():
showard27f33872009-04-07 18:20:53 +000090 try:
showard549afad2009-08-20 23:33:36 +000091 try:
92 main_without_exception_handling()
93 except SystemExit:
94 raise
95 except:
96 logging.exception('Exception escaping in monitor_db')
97 raise
98 finally:
99 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000100
101
102def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000103 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000104
showard136e6dc2009-06-10 19:38:49 +0000105 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000106 parser = optparse.OptionParser(usage)
107 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
108 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000109 parser.add_option('--test', help='Indicate that scheduler is under ' +
110 'test and should use dummy autoserv and no parsing',
111 action='store_true')
112 (options, args) = parser.parse_args()
113 if len(args) != 1:
114 parser.print_usage()
115 return
mbligh36768f02008-02-22 18:28:33 +0000116
showard5613c662009-06-08 23:30:33 +0000117 scheduler_enabled = global_config.global_config.get_config_value(
118 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
119
120 if not scheduler_enabled:
121 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
122 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000123 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000124 sys.exit(1)
125
jadmanski0afbb632008-06-06 21:10:57 +0000126 global RESULTS_DIR
127 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000128
mbligh83c1e9e2009-05-01 23:10:41 +0000129 site_init = utils.import_site_function(__file__,
130 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
131 _site_init_monitor_db_dummy)
132 site_init()
133
showardcca334f2009-03-12 20:38:34 +0000134 # Change the cwd while running to avoid issues incase we were launched from
135 # somewhere odd (such as a random NFS home directory of the person running
136 # sudo to launch us as the appropriate user).
137 os.chdir(RESULTS_DIR)
138
jamesrenc7d387e2010-08-10 21:48:30 +0000139 # This is helpful for debugging why stuff a scheduler launches is
140 # misbehaving.
141 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000142
jadmanski0afbb632008-06-06 21:10:57 +0000143 if options.test:
144 global _autoserv_path
145 _autoserv_path = 'autoserv_dummy'
146 global _testing_mode
147 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000148
jamesrenc44ae992010-02-19 00:12:54 +0000149 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000150 server.start()
151
jadmanski0afbb632008-06-06 21:10:57 +0000152 try:
jamesrenc44ae992010-02-19 00:12:54 +0000153 initialize()
showardc5afc462009-01-13 00:09:39 +0000154 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000155 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000156
Eric Lia82dc352011-02-23 13:15:52 -0800157 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000158 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000159 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000160 except:
showard170873e2009-01-07 00:22:26 +0000161 email_manager.manager.log_stacktrace(
162 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000163
showard170873e2009-01-07 00:22:26 +0000164 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000165 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000166 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000167 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000168
169
showard136e6dc2009-06-10 19:38:49 +0000170def setup_logging():
171 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
172 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
173 logging_manager.configure_logging(
174 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
175 logfile_name=log_name)
176
177
mbligh36768f02008-02-22 18:28:33 +0000178def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000179 global _shutdown
180 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000181 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000182
183
jamesrenc44ae992010-02-19 00:12:54 +0000184def initialize():
showardb18134f2009-03-20 20:52:18 +0000185 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
186 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000187
showard8de37132009-08-31 18:33:08 +0000188 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000189 logging.critical("monitor_db already running, aborting!")
190 sys.exit(1)
191 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000192
showardb1e51872008-10-07 11:08:18 +0000193 if _testing_mode:
194 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000195 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000196
jadmanski0afbb632008-06-06 21:10:57 +0000197 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
198 global _db
showard170873e2009-01-07 00:22:26 +0000199 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000200 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000201
showardfa8629c2008-11-04 16:51:23 +0000202 # ensure Django connection is in autocommit
203 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000204 # bypass the readonly connection
205 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000206
showardb18134f2009-03-20 20:52:18 +0000207 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000208 signal.signal(signal.SIGINT, handle_sigint)
209
jamesrenc44ae992010-02-19 00:12:54 +0000210 initialize_globals()
211 scheduler_models.initialize()
212
showardd1ee1dd2009-01-07 21:33:08 +0000213 drones = global_config.global_config.get_config_value(
214 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
215 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000216 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000217 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000218 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
219
showardb18134f2009-03-20 20:52:18 +0000220 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000221
222
jamesrenc44ae992010-02-19 00:12:54 +0000223def initialize_globals():
224 global _drone_manager
225 _drone_manager = drone_manager.instance()
226
227
showarded2afea2009-07-07 20:54:07 +0000228def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
229 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000230 """
231 @returns The autoserv command line as a list of executable + parameters.
232
233 @param machines - string - A machine or comma separated list of machines
234 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000235 @param extra_args - list - Additional arguments to pass to autoserv.
236 @param job - Job object - If supplied, -u owner and -l name parameters
237 will be added.
238 @param queue_entry - A HostQueueEntry object - If supplied and no Job
239 object was supplied, this will be used to lookup the Job object.
240 """
showarda9545c02009-12-18 22:44:26 +0000241 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000242 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000243 if machines:
244 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000245 if job or queue_entry:
246 if not job:
247 job = queue_entry.job
248 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000249 if verbose:
250 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000251 return autoserv_argv + extra_args
252
253
Simran Basia858a232012-08-21 11:04:37 -0700254class BaseDispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000255 def __init__(self):
256 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000257 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800258 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000259 user_cleanup_time = scheduler_config.config.clean_interval
260 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
261 _db, user_cleanup_time)
262 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000263 self._host_agents = {}
264 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000265 self._tick_count = 0
266 self._last_garbage_stats_time = time.time()
267 self._seconds_between_garbage_stats = 60 * (
268 global_config.global_config.get_config_value(
269 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700270 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700271 self._tick_debug = global_config.global_config.get_config_value(
272 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
273 default=False)
mbligh36768f02008-02-22 18:28:33 +0000274
mbligh36768f02008-02-22 18:28:33 +0000275
showard915958d2009-04-22 21:00:58 +0000276 def initialize(self, recover_hosts=True):
277 self._periodic_cleanup.initialize()
278 self._24hr_upkeep.initialize()
279
jadmanski0afbb632008-06-06 21:10:57 +0000280 # always recover processes
281 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000282
jadmanski0afbb632008-06-06 21:10:57 +0000283 if recover_hosts:
284 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000285
jamesrenc44ae992010-02-19 00:12:54 +0000286 self._host_scheduler.recovery_on_startup()
287
mbligh36768f02008-02-22 18:28:33 +0000288
Simran Basi0ec94dd2012-08-28 09:50:10 -0700289 def _log_tick_msg(self, msg):
290 if self._tick_debug:
291 logging.debug(msg)
292
293
jadmanski0afbb632008-06-06 21:10:57 +0000294 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700295 """
296 This is an altered version of tick() where we keep track of when each
297 major step begins so we can try to figure out where we are using most
298 of the tick time.
299 """
Simran Basi3f6717d2012-09-13 15:21:22 -0700300 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000301 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700302 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000303 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700304 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000305 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700306 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000307 self._find_aborting()
Simran Basi3f6717d2012-09-13 15:21:22 -0700308 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000309 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700310 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000311 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700312 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000313 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700314 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000315 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700316 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000317 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700318 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000319 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700320 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000321 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700322 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000323 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700324 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700325 'email_manager.manager.send_queued_emails().')
showard170873e2009-01-07 00:22:26 +0000326 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700327 self._log_tick_msg('Calling django.db.reset_queries().')
showard402934a2009-12-21 22:20:47 +0000328 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000329 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000330
showard97aed502008-11-04 02:01:24 +0000331
mblighf3294cc2009-04-08 21:17:38 +0000332 def _run_cleanup(self):
333 self._periodic_cleanup.run_cleanup_maybe()
334 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000335
mbligh36768f02008-02-22 18:28:33 +0000336
showardf13a9e22009-12-18 22:54:09 +0000337 def _garbage_collection(self):
338 threshold_time = time.time() - self._seconds_between_garbage_stats
339 if threshold_time < self._last_garbage_stats_time:
340 # Don't generate these reports very often.
341 return
342
343 self._last_garbage_stats_time = time.time()
344 # Force a full level 0 collection (because we can, it doesn't hurt
345 # at this interval).
346 gc.collect()
347 logging.info('Logging garbage collector stats on tick %d.',
348 self._tick_count)
349 gc_stats._log_garbage_collector_stats()
350
351
showard170873e2009-01-07 00:22:26 +0000352 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
353 for object_id in object_ids:
354 agent_dict.setdefault(object_id, set()).add(agent)
355
356
357 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
358 for object_id in object_ids:
359 assert object_id in agent_dict
360 agent_dict[object_id].remove(agent)
361
362
showardd1195652009-12-08 22:21:02 +0000363 def add_agent_task(self, agent_task):
364 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000365 self._agents.append(agent)
366 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000367 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
368 self._register_agent_for_ids(self._queue_entry_agents,
369 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000370
showard170873e2009-01-07 00:22:26 +0000371
372 def get_agents_for_entry(self, queue_entry):
373 """
374 Find agents corresponding to the specified queue_entry.
375 """
showardd3dc1992009-04-22 21:01:40 +0000376 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000377
378
379 def host_has_agent(self, host):
380 """
381 Determine if there is currently an Agent present using this host.
382 """
383 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000384
385
jadmanski0afbb632008-06-06 21:10:57 +0000386 def remove_agent(self, agent):
387 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000388 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
389 agent)
390 self._unregister_agent_for_ids(self._queue_entry_agents,
391 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000392
393
showard8cc058f2009-09-08 16:26:33 +0000394 def _host_has_scheduled_special_task(self, host):
395 return bool(models.SpecialTask.objects.filter(host__id=host.id,
396 is_active=False,
397 is_complete=False))
398
399
jadmanski0afbb632008-06-06 21:10:57 +0000400 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000401 agent_tasks = self._create_recovery_agent_tasks()
402 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000403 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000404 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000405 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000406 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000407 self._reverify_remaining_hosts()
408 # reinitialize drones after killing orphaned processes, since they can
409 # leave around files when they die
410 _drone_manager.execute_actions()
411 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000412
showard170873e2009-01-07 00:22:26 +0000413
showardd1195652009-12-08 22:21:02 +0000414 def _create_recovery_agent_tasks(self):
415 return (self._get_queue_entry_agent_tasks()
416 + self._get_special_task_agent_tasks(is_active=True))
417
418
419 def _get_queue_entry_agent_tasks(self):
420 # host queue entry statuses handled directly by AgentTasks (Verifying is
421 # handled through SpecialTasks, so is not listed here)
422 statuses = (models.HostQueueEntry.Status.STARTING,
423 models.HostQueueEntry.Status.RUNNING,
424 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000425 models.HostQueueEntry.Status.PARSING,
426 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000427 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000428 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000429 where='status IN (%s)' % status_list)
430
431 agent_tasks = []
432 used_queue_entries = set()
433 for entry in queue_entries:
434 if self.get_agents_for_entry(entry):
435 # already being handled
436 continue
437 if entry in used_queue_entries:
438 # already picked up by a synchronous job
439 continue
440 agent_task = self._get_agent_task_for_queue_entry(entry)
441 agent_tasks.append(agent_task)
442 used_queue_entries.update(agent_task.queue_entries)
443 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000444
445
showardd1195652009-12-08 22:21:02 +0000446 def _get_special_task_agent_tasks(self, is_active=False):
447 special_tasks = models.SpecialTask.objects.filter(
448 is_active=is_active, is_complete=False)
449 return [self._get_agent_task_for_special_task(task)
450 for task in special_tasks]
451
452
453 def _get_agent_task_for_queue_entry(self, queue_entry):
454 """
455 Construct an AgentTask instance for the given active HostQueueEntry,
456 if one can currently run it.
457 @param queue_entry: a HostQueueEntry
458 @returns an AgentTask to run the queue entry
459 """
460 task_entries = queue_entry.job.get_group_entries(queue_entry)
461 self._check_for_duplicate_host_entries(task_entries)
462
463 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
464 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000465 if queue_entry.is_hostless():
466 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000467 return QueueTask(queue_entries=task_entries)
468 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
469 return GatherLogsTask(queue_entries=task_entries)
470 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
471 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000472 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
473 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000474
Dale Curtisaa513362011-03-01 17:27:44 -0800475 raise host_scheduler.SchedulerError(
476 '_get_agent_task_for_queue_entry got entry with '
477 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000478
479
480 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000481 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
482 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000483 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000484 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000485 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000486 if using_host:
showardd1195652009-12-08 22:21:02 +0000487 self._assert_host_has_no_agent(task_entry)
488
489
490 def _assert_host_has_no_agent(self, entry):
491 """
492 @param entry: a HostQueueEntry or a SpecialTask
493 """
494 if self.host_has_agent(entry.host):
495 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800496 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000497 'While scheduling %s, host %s already has a host agent %s'
498 % (entry, entry.host, agent.task))
499
500
501 def _get_agent_task_for_special_task(self, special_task):
502 """
503 Construct an AgentTask class to run the given SpecialTask and add it
504 to this dispatcher.
505 @param special_task: a models.SpecialTask instance
506 @returns an AgentTask to run this SpecialTask
507 """
508 self._assert_host_has_no_agent(special_task)
509
510 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
511 for agent_task_class in special_agent_task_classes:
512 if agent_task_class.TASK_TYPE == special_task.task:
513 return agent_task_class(task=special_task)
514
Dale Curtisaa513362011-03-01 17:27:44 -0800515 raise host_scheduler.SchedulerError(
516 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000517
518
519 def _register_pidfiles(self, agent_tasks):
520 for agent_task in agent_tasks:
521 agent_task.register_necessary_pidfiles()
522
523
524 def _recover_tasks(self, agent_tasks):
525 orphans = _drone_manager.get_orphaned_autoserv_processes()
526
527 for agent_task in agent_tasks:
528 agent_task.recover()
529 if agent_task.monitor and agent_task.monitor.has_process():
530 orphans.discard(agent_task.monitor.get_process())
531 self.add_agent_task(agent_task)
532
533 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000534
535
showard8cc058f2009-09-08 16:26:33 +0000536 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000537 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
538 % status):
showard0db3d432009-10-12 20:29:15 +0000539 if entry.status == status and not self.get_agents_for_entry(entry):
540 # The status can change during iteration, e.g., if job.run()
541 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000542 yield entry
543
544
showard6878e8b2009-07-20 22:37:45 +0000545 def _check_for_remaining_orphan_processes(self, orphans):
546 if not orphans:
547 return
548 subject = 'Unrecovered orphan autoserv processes remain'
549 message = '\n'.join(str(process) for process in orphans)
550 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000551
552 die_on_orphans = global_config.global_config.get_config_value(
553 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
554
555 if die_on_orphans:
556 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000557
showard170873e2009-01-07 00:22:26 +0000558
showard8cc058f2009-09-08 16:26:33 +0000559 def _recover_pending_entries(self):
560 for entry in self._get_unassigned_entries(
561 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000562 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000563 entry.on_pending()
564
565
showardb8900452009-10-12 20:31:01 +0000566 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000567 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000568 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
569 unrecovered_hqes = []
570 for queue_entry in queue_entries:
571 special_tasks = models.SpecialTask.objects.filter(
572 task__in=(models.SpecialTask.Task.CLEANUP,
573 models.SpecialTask.Task.VERIFY),
574 queue_entry__id=queue_entry.id,
575 is_complete=False)
576 if special_tasks.count() == 0:
577 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000578
showardb8900452009-10-12 20:31:01 +0000579 if unrecovered_hqes:
580 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800581 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000582 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000583 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000584
585
showard65db3932009-10-28 19:54:35 +0000586 def _get_prioritized_special_tasks(self):
587 """
588 Returns all queued SpecialTasks prioritized for repair first, then
589 cleanup, then verify.
590 """
591 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
592 is_complete=False,
593 host__locked=False)
594 # exclude hosts with active queue entries unless the SpecialTask is for
595 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000596 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000597 queued_tasks, 'afe_host_queue_entries', 'host_id',
598 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000599 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000600 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000601 where=['(afe_host_queue_entries.id IS NULL OR '
602 'afe_host_queue_entries.id = '
603 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000604
showard65db3932009-10-28 19:54:35 +0000605 # reorder tasks by priority
606 task_priority_order = [models.SpecialTask.Task.REPAIR,
607 models.SpecialTask.Task.CLEANUP,
608 models.SpecialTask.Task.VERIFY]
609 def task_priority_key(task):
610 return task_priority_order.index(task.task)
611 return sorted(queued_tasks, key=task_priority_key)
612
613
showard65db3932009-10-28 19:54:35 +0000614 def _schedule_special_tasks(self):
615 """
616 Execute queued SpecialTasks that are ready to run on idle hosts.
617 """
618 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000619 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000620 continue
showardd1195652009-12-08 22:21:02 +0000621 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000622
623
showard170873e2009-01-07 00:22:26 +0000624 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000625 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000626 # should never happen
showarded2afea2009-07-07 20:54:07 +0000627 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000628 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000629 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000630 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000631 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000632
633
jadmanski0afbb632008-06-06 21:10:57 +0000634 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000635 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700636 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000637 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000638 if self.host_has_agent(host):
639 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000640 continue
showard8cc058f2009-09-08 16:26:33 +0000641 if self._host_has_scheduled_special_task(host):
642 # host will have a special task scheduled on the next cycle
643 continue
showard170873e2009-01-07 00:22:26 +0000644 if print_message:
showardb18134f2009-03-20 20:52:18 +0000645 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000646 models.SpecialTask.objects.create(
647 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000648 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000649
650
jadmanski0afbb632008-06-06 21:10:57 +0000651 def _recover_hosts(self):
652 # recover "Repair Failed" hosts
653 message = 'Reverifying dead host %s'
654 self._reverify_hosts_where("status = 'Repair Failed'",
655 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000656
657
showard04c82c52008-05-29 19:38:12 +0000658
showardb95b1bd2008-08-15 18:11:04 +0000659 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000660 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +0000661 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000662 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000663 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +0000664 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000665
666
showard89f84db2009-03-12 20:39:13 +0000667 def _refresh_pending_queue_entries(self):
668 """
669 Lookup the pending HostQueueEntries and call our HostScheduler
670 refresh() method given that list. Return the list.
671
672 @returns A list of pending HostQueueEntries sorted in priority order.
673 """
showard63a34772008-08-18 19:32:50 +0000674 queue_entries = self._get_pending_queue_entries()
675 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000676 return []
showardb95b1bd2008-08-15 18:11:04 +0000677
showard63a34772008-08-18 19:32:50 +0000678 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000679
showard89f84db2009-03-12 20:39:13 +0000680 return queue_entries
681
682
683 def _schedule_atomic_group(self, queue_entry):
684 """
685 Schedule the given queue_entry on an atomic group of hosts.
686
687 Returns immediately if there are insufficient available hosts.
688
689 Creates new HostQueueEntries based off of queue_entry for the
690 scheduled hosts and starts them all running.
691 """
692 # This is a virtual host queue entry representing an entire
693 # atomic group, find a group and schedule their hosts.
694 group_hosts = self._host_scheduler.find_eligible_atomic_group(
695 queue_entry)
696 if not group_hosts:
697 return
showardcbe6f942009-06-17 19:33:49 +0000698
699 logging.info('Expanding atomic group entry %s with hosts %s',
700 queue_entry,
701 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000702
showard89f84db2009-03-12 20:39:13 +0000703 for assigned_host in group_hosts[1:]:
704 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000705 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000706 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000707 new_hqe.set_host(assigned_host)
708 self._run_queue_entry(new_hqe)
709
710 # The first assigned host uses the original HostQueueEntry
711 queue_entry.set_host(group_hosts[0])
712 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000713
714
showarda9545c02009-12-18 22:44:26 +0000715 def _schedule_hostless_job(self, queue_entry):
716 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000717 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000718
719
showard89f84db2009-03-12 20:39:13 +0000720 def _schedule_new_jobs(self):
721 queue_entries = self._refresh_pending_queue_entries()
722 if not queue_entries:
723 return
724
Simran Basi3f6717d2012-09-13 15:21:22 -0700725 logging.debug('Processing %d queue_entries', len(queue_entries))
showard63a34772008-08-18 19:32:50 +0000726 for queue_entry in queue_entries:
Simran Basi3f6717d2012-09-13 15:21:22 -0700727 logging.debug('Processing queue_entry: %s', queue_entry)
showarde55955f2009-10-07 20:48:58 +0000728 is_unassigned_atomic_group = (
729 queue_entry.atomic_group_id is not None
730 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +0000731
732 if queue_entry.is_hostless():
Simran Basi3f6717d2012-09-13 15:21:22 -0700733 logging.debug('Scheduling hostless job.')
showarda9545c02009-12-18 22:44:26 +0000734 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000735 elif is_unassigned_atomic_group:
736 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +0000737 else:
jamesren883492a2010-02-12 00:45:18 +0000738 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +0000739 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +0000740 assert assigned_host.id == queue_entry.host_id
741 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000742
743
showard8cc058f2009-09-08 16:26:33 +0000744 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +0000745 for agent_task in self._get_queue_entry_agent_tasks():
746 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000747
748
749 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000750 for entry in scheduler_models.HostQueueEntry.fetch(
751 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000752 task = entry.job.schedule_delayed_callback_task(entry)
753 if task:
showardd1195652009-12-08 22:21:02 +0000754 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000755
756
jamesren883492a2010-02-12 00:45:18 +0000757 def _run_queue_entry(self, queue_entry):
Simran Basi3f6717d2012-09-13 15:21:22 -0700758 logging.debug('Scheduling pre job tasks for queue_entry: %s',
759 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000760 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000761
762
jadmanski0afbb632008-06-06 21:10:57 +0000763 def _find_aborting(self):
jamesrene7c65cb2010-06-08 20:38:10 +0000764 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000765 for entry in scheduler_models.HostQueueEntry.fetch(
766 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +0000767 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +0000768 for agent in self.get_agents_for_entry(entry):
769 agent.abort()
770 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000771 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700772 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000773 for job in jobs_to_stop:
774 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000775
776
showard324bf812009-01-20 23:23:38 +0000777 def _can_start_agent(self, agent, num_started_this_cycle,
778 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000779 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000780 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000781 return True
782 # don't allow any nonzero-process agents to run after we've reached a
783 # limit (this avoids starvation of many-process agents)
784 if have_reached_limit:
785 return False
786 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000787 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000788 agent.task.owner_username,
789 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000790 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000791 return False
792 # if a single agent exceeds the per-cycle throttling, still allow it to
793 # run when it's the first agent in the cycle
794 if num_started_this_cycle == 0:
795 return True
796 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000797 if (num_started_this_cycle + agent.task.num_processes >
798 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000799 return False
800 return True
801
802
jadmanski0afbb632008-06-06 21:10:57 +0000803 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000804 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000805 have_reached_limit = False
806 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700807 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000808 for agent in list(self._agents):
Simran Basi3f6717d2012-09-13 15:21:22 -0700809 logging.debug('Processing Agent with Host Ids: %s and queue_entry '
810 'ids:%s', agent.host_ids, agent.queue_entry_ids)
showard8cc058f2009-09-08 16:26:33 +0000811 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000812 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000813 have_reached_limit):
814 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700815 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000816 continue
showardd1195652009-12-08 22:21:02 +0000817 num_started_this_cycle += agent.task.num_processes
Simran Basi3f6717d2012-09-13 15:21:22 -0700818 logging.debug('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000819 agent.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700820 logging.debug('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000821 if agent.is_done():
Simran Basi3f6717d2012-09-13 15:21:22 -0700822 logging.info("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000823 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700824 logging.info('%d running processes. %d added this cycle.',
825 _drone_manager.total_running_processes(),
826 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +0000827
828
showard29f7cd22009-04-29 21:16:24 +0000829 def _process_recurring_runs(self):
830 recurring_runs = models.RecurringRun.objects.filter(
831 start_date__lte=datetime.datetime.now())
832 for rrun in recurring_runs:
833 # Create job from template
834 job = rrun.job
835 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000836 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000837
838 host_objects = info['hosts']
839 one_time_hosts = info['one_time_hosts']
840 metahost_objects = info['meta_hosts']
841 dependencies = info['dependencies']
842 atomic_group = info['atomic_group']
843
844 for host in one_time_hosts or []:
845 this_host = models.Host.create_one_time_host(host.hostname)
846 host_objects.append(this_host)
847
848 try:
849 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000850 options=options,
showard29f7cd22009-04-29 21:16:24 +0000851 host_objects=host_objects,
852 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +0000853 atomic_group=atomic_group)
854
855 except Exception, ex:
856 logging.exception(ex)
857 #TODO send email
858
859 if rrun.loop_count == 1:
860 rrun.delete()
861 else:
862 if rrun.loop_count != 0: # if not infinite loop
863 # calculate new start_date
864 difference = datetime.timedelta(seconds=rrun.loop_period)
865 rrun.start_date = rrun.start_date + difference
866 rrun.loop_count -= 1
867 rrun.save()
868
869
Simran Basia858a232012-08-21 11:04:37 -0700870SiteDispatcher = utils.import_site_class(
871 __file__, 'autotest_lib.scheduler.site_monitor_db',
872 'SiteDispatcher', BaseDispatcher)
873
874class Dispatcher(SiteDispatcher):
875 pass
876
877
showard170873e2009-01-07 00:22:26 +0000878class PidfileRunMonitor(object):
879 """
880 Client must call either run() to start a new process or
881 attach_to_existing_process().
882 """
mbligh36768f02008-02-22 18:28:33 +0000883
showard170873e2009-01-07 00:22:26 +0000884 class _PidfileException(Exception):
885 """
886 Raised when there's some unexpected behavior with the pid file, but only
887 used internally (never allowed to escape this class).
888 """
mbligh36768f02008-02-22 18:28:33 +0000889
890
showard170873e2009-01-07 00:22:26 +0000891 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000892 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000893 self._start_time = None
894 self.pidfile_id = None
895 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000896
897
showard170873e2009-01-07 00:22:26 +0000898 def _add_nice_command(self, command, nice_level):
899 if not nice_level:
900 return command
901 return ['nice', '-n', str(nice_level)] + command
902
903
904 def _set_start_time(self):
905 self._start_time = time.time()
906
907
showard418785b2009-11-23 20:19:59 +0000908 def run(self, command, working_directory, num_processes, nice_level=None,
909 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +0000910 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +0000911 assert command is not None
912 if nice_level is not None:
913 command = ['nice', '-n', str(nice_level)] + command
914 self._set_start_time()
915 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +0000916 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +0000917 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +0000918 paired_with_pidfile=paired_with_pidfile, username=username,
919 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +0000920
921
showarded2afea2009-07-07 20:54:07 +0000922 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +0000923 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +0000924 num_processes=None):
showard170873e2009-01-07 00:22:26 +0000925 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +0000926 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000927 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +0000928 if num_processes is not None:
929 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +0000930
931
jadmanski0afbb632008-06-06 21:10:57 +0000932 def kill(self):
showard170873e2009-01-07 00:22:26 +0000933 if self.has_process():
934 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000935
mbligh36768f02008-02-22 18:28:33 +0000936
showard170873e2009-01-07 00:22:26 +0000937 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000938 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000939 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000940
941
showard170873e2009-01-07 00:22:26 +0000942 def get_process(self):
showard21baa452008-10-21 00:08:39 +0000943 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +0000944 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +0000945 return self._state.process
mblighbb421852008-03-11 22:36:16 +0000946
947
showard170873e2009-01-07 00:22:26 +0000948 def _read_pidfile(self, use_second_read=False):
949 assert self.pidfile_id is not None, (
950 'You must call run() or attach_to_existing_process()')
951 contents = _drone_manager.get_pidfile_contents(
952 self.pidfile_id, use_second_read=use_second_read)
953 if contents.is_invalid():
954 self._state = drone_manager.PidfileContents()
955 raise self._PidfileException(contents)
956 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +0000957
958
showard21baa452008-10-21 00:08:39 +0000959 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +0000960 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
961 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +0000962 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +0000963 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +0000964
965
966 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +0000967 if self.lost_process:
showard21baa452008-10-21 00:08:39 +0000968 return
mblighbb421852008-03-11 22:36:16 +0000969
showard21baa452008-10-21 00:08:39 +0000970 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +0000971
showard170873e2009-01-07 00:22:26 +0000972 if self._state.process is None:
973 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +0000974 return
mbligh90a549d2008-03-25 23:52:34 +0000975
showard21baa452008-10-21 00:08:39 +0000976 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000977 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +0000978 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +0000979 return
mbligh90a549d2008-03-25 23:52:34 +0000980
showard170873e2009-01-07 00:22:26 +0000981 # pid but no running process - maybe process *just* exited
982 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +0000983 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000984 # autoserv exited without writing an exit code
985 # to the pidfile
showard21baa452008-10-21 00:08:39 +0000986 self._handle_pidfile_error(
987 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +0000988
showard21baa452008-10-21 00:08:39 +0000989
990 def _get_pidfile_info(self):
991 """\
992 After completion, self._state will contain:
993 pid=None, exit_status=None if autoserv has not yet run
994 pid!=None, exit_status=None if autoserv is running
995 pid!=None, exit_status!=None if autoserv has completed
996 """
997 try:
998 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +0000999 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001000 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001001
1002
showard170873e2009-01-07 00:22:26 +00001003 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001004 """\
1005 Called when no pidfile is found or no pid is in the pidfile.
1006 """
showard170873e2009-01-07 00:22:26 +00001007 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001008 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001009 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001010 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001011 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001012
1013
showard35162b02009-03-03 02:17:30 +00001014 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001015 """\
1016 Called when autoserv has exited without writing an exit status,
1017 or we've timed out waiting for autoserv to write a pid to the
1018 pidfile. In either case, we just return failure and the caller
1019 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001020
showard170873e2009-01-07 00:22:26 +00001021 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001022 """
1023 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001024 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001025 self._state.exit_status = 1
1026 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001027
1028
jadmanski0afbb632008-06-06 21:10:57 +00001029 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001030 self._get_pidfile_info()
1031 return self._state.exit_status
1032
1033
1034 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001035 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001036 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001037 if self._state.num_tests_failed is None:
1038 return -1
showard21baa452008-10-21 00:08:39 +00001039 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001040
1041
showardcdaeae82009-08-31 18:32:48 +00001042 def try_copy_results_on_drone(self, **kwargs):
1043 if self.has_process():
1044 # copy results logs into the normal place for job results
1045 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1046
1047
1048 def try_copy_to_results_repository(self, source, **kwargs):
1049 if self.has_process():
1050 _drone_manager.copy_to_results_repository(self.get_process(),
1051 source, **kwargs)
1052
1053
mbligh36768f02008-02-22 18:28:33 +00001054class Agent(object):
showard77182562009-06-10 00:16:05 +00001055 """
showard8cc058f2009-09-08 16:26:33 +00001056 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001057
1058 The following methods are required on all task objects:
1059 poll() - Called periodically to let the task check its status and
1060 update its internal state. If the task succeeded.
1061 is_done() - Returns True if the task is finished.
1062 abort() - Called when an abort has been requested. The task must
1063 set its aborted attribute to True if it actually aborted.
1064
1065 The following attributes are required on all task objects:
1066 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001067 success - bool, True if this task succeeded.
1068 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1069 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001070 """
1071
1072
showard418785b2009-11-23 20:19:59 +00001073 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001074 """
showard8cc058f2009-09-08 16:26:33 +00001075 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001076 """
showard8cc058f2009-09-08 16:26:33 +00001077 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001078
showard77182562009-06-10 00:16:05 +00001079 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001080 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001081
showard8cc058f2009-09-08 16:26:33 +00001082 self.queue_entry_ids = task.queue_entry_ids
1083 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001084
showard8cc058f2009-09-08 16:26:33 +00001085 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001086 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001087
1088
jadmanski0afbb632008-06-06 21:10:57 +00001089 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001090 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001091 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001092 self.task.poll()
1093 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001094 self.finished = True
showardec113162008-05-08 00:52:49 +00001095
1096
jadmanski0afbb632008-06-06 21:10:57 +00001097 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001098 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001099
1100
showardd3dc1992009-04-22 21:01:40 +00001101 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001102 if self.task:
1103 self.task.abort()
1104 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001105 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001106 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001107
showardd3dc1992009-04-22 21:01:40 +00001108
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001109class BaseAgentTask(object):
showardd1195652009-12-08 22:21:02 +00001110 class _NullMonitor(object):
1111 pidfile_id = None
1112
1113 def has_process(self):
1114 return True
1115
1116
1117 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001118 """
showardd1195652009-12-08 22:21:02 +00001119 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001120 """
jadmanski0afbb632008-06-06 21:10:57 +00001121 self.done = False
showardd1195652009-12-08 22:21:02 +00001122 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001123 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001124 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001125 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001126 self.queue_entry_ids = []
1127 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001128 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001129
1130
1131 def _set_ids(self, host=None, queue_entries=None):
1132 if queue_entries and queue_entries != [None]:
1133 self.host_ids = [entry.host.id for entry in queue_entries]
1134 self.queue_entry_ids = [entry.id for entry in queue_entries]
1135 else:
1136 assert host
1137 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001138
1139
jadmanski0afbb632008-06-06 21:10:57 +00001140 def poll(self):
showard08a36412009-05-05 01:01:13 +00001141 if not self.started:
1142 self.start()
showardd1195652009-12-08 22:21:02 +00001143 if not self.done:
1144 self.tick()
showard08a36412009-05-05 01:01:13 +00001145
1146
1147 def tick(self):
showardd1195652009-12-08 22:21:02 +00001148 assert self.monitor
1149 exit_code = self.monitor.exit_code()
1150 if exit_code is None:
1151 return
mbligh36768f02008-02-22 18:28:33 +00001152
showardd1195652009-12-08 22:21:02 +00001153 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001154 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001155
1156
jadmanski0afbb632008-06-06 21:10:57 +00001157 def is_done(self):
1158 return self.done
mbligh36768f02008-02-22 18:28:33 +00001159
1160
jadmanski0afbb632008-06-06 21:10:57 +00001161 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001162 if self.done:
showardd1195652009-12-08 22:21:02 +00001163 assert self.started
showard08a36412009-05-05 01:01:13 +00001164 return
showardd1195652009-12-08 22:21:02 +00001165 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001166 self.done = True
1167 self.success = success
1168 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001169
1170
jadmanski0afbb632008-06-06 21:10:57 +00001171 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001172 """
1173 To be overridden.
1174 """
showarded2afea2009-07-07 20:54:07 +00001175 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001176 self.register_necessary_pidfiles()
1177
1178
1179 def _log_file(self):
1180 if not self._log_file_name:
1181 return None
1182 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001183
mbligh36768f02008-02-22 18:28:33 +00001184
jadmanski0afbb632008-06-06 21:10:57 +00001185 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001186 log_file = self._log_file()
1187 if self.monitor and log_file:
1188 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001189
1190
jadmanski0afbb632008-06-06 21:10:57 +00001191 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001192 """
1193 To be overridden.
1194 """
jadmanski0afbb632008-06-06 21:10:57 +00001195 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001196 logging.info("%s finished with success=%s", type(self).__name__,
1197 self.success)
1198
mbligh36768f02008-02-22 18:28:33 +00001199
1200
jadmanski0afbb632008-06-06 21:10:57 +00001201 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001202 if not self.started:
1203 self.prolog()
1204 self.run()
1205
1206 self.started = True
1207
1208
1209 def abort(self):
1210 if self.monitor:
1211 self.monitor.kill()
1212 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001213 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001214 self.cleanup()
1215
1216
showarded2afea2009-07-07 20:54:07 +00001217 def _get_consistent_execution_path(self, execution_entries):
1218 first_execution_path = execution_entries[0].execution_path()
1219 for execution_entry in execution_entries[1:]:
1220 assert execution_entry.execution_path() == first_execution_path, (
1221 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1222 execution_entry,
1223 first_execution_path,
1224 execution_entries[0]))
1225 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001226
1227
showarded2afea2009-07-07 20:54:07 +00001228 def _copy_results(self, execution_entries, use_monitor=None):
1229 """
1230 @param execution_entries: list of objects with execution_path() method
1231 """
showard6d1c1432009-08-20 23:30:39 +00001232 if use_monitor is not None and not use_monitor.has_process():
1233 return
1234
showarded2afea2009-07-07 20:54:07 +00001235 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001236 if use_monitor is None:
1237 assert self.monitor
1238 use_monitor = self.monitor
1239 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001240 execution_path = self._get_consistent_execution_path(execution_entries)
1241 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001242 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001243
showarda1e74b32009-05-12 17:32:04 +00001244
1245 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001246 for queue_entry in queue_entries:
1247 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001248
1249
mbligh4608b002010-01-05 18:22:35 +00001250 def _archive_results(self, queue_entries):
1251 for queue_entry in queue_entries:
1252 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001253
1254
showardd1195652009-12-08 22:21:02 +00001255 def _command_line(self):
1256 """
1257 Return the command line to run. Must be overridden.
1258 """
1259 raise NotImplementedError
1260
1261
1262 @property
1263 def num_processes(self):
1264 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001265 Return the number of processes forked by this BaseAgentTask's process.
1266 It may only be approximate. To be overridden if necessary.
showardd1195652009-12-08 22:21:02 +00001267 """
1268 return 1
1269
1270
1271 def _paired_with_monitor(self):
1272 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001273 If this BaseAgentTask's process must run on the same machine as some
showardd1195652009-12-08 22:21:02 +00001274 previous process, this method should be overridden to return a
1275 PidfileRunMonitor for that process.
1276 """
1277 return self._NullMonitor()
1278
1279
1280 @property
1281 def owner_username(self):
1282 """
1283 Return login of user responsible for this task. May be None. Must be
1284 overridden.
1285 """
1286 raise NotImplementedError
1287
1288
1289 def _working_directory(self):
1290 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001291 Return the directory where this BaseAgentTask's process executes.
1292 Must be overridden.
showardd1195652009-12-08 22:21:02 +00001293 """
1294 raise NotImplementedError
1295
1296
1297 def _pidfile_name(self):
1298 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001299 Return the name of the pidfile this BaseAgentTask's process uses. To be
showardd1195652009-12-08 22:21:02 +00001300 overridden if necessary.
1301 """
jamesrenc44ae992010-02-19 00:12:54 +00001302 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001303
1304
1305 def _check_paired_results_exist(self):
1306 if not self._paired_with_monitor().has_process():
1307 email_manager.manager.enqueue_notify_email(
1308 'No paired results in task',
1309 'No paired results in task %s at %s'
1310 % (self, self._paired_with_monitor().pidfile_id))
1311 self.finished(False)
1312 return False
1313 return True
1314
1315
1316 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001317 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001318 self.monitor = PidfileRunMonitor()
1319
1320
1321 def run(self):
1322 if not self._check_paired_results_exist():
1323 return
1324
1325 self._create_monitor()
1326 self.monitor.run(
1327 self._command_line(), self._working_directory(),
1328 num_processes=self.num_processes,
1329 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1330 pidfile_name=self._pidfile_name(),
1331 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001332 username=self.owner_username,
1333 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1334
1335
1336 def get_drone_hostnames_allowed(self):
1337 if not models.DroneSet.drone_sets_enabled():
1338 return None
1339
1340 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1341 if not hqes:
1342 # Only special tasks could be missing host queue entries
1343 assert isinstance(self, SpecialAgentTask)
1344 return self._user_or_global_default_drone_set(
1345 self.task, self.task.requested_by)
1346
1347 job_ids = hqes.values_list('job', flat=True).distinct()
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001348 assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
jamesren76fcf192010-04-21 20:39:50 +00001349 "span multiple jobs")
1350
1351 job = models.Job.objects.get(id=job_ids[0])
1352 drone_set = job.drone_set
1353 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001354 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001355
1356 return drone_set.get_drone_hostnames()
1357
1358
1359 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1360 """
1361 Returns the user's default drone set, if present.
1362
1363 Otherwise, returns the global default drone set.
1364 """
1365 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1366 if not user:
1367 logging.warn('%s had no owner; using default drone set',
1368 obj_with_owner)
1369 return default_hostnames
1370 if not user.drone_set:
1371 logging.warn('User %s has no default drone set, using global '
1372 'default', user.login)
1373 return default_hostnames
1374 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001375
1376
1377 def register_necessary_pidfiles(self):
1378 pidfile_id = _drone_manager.get_pidfile_id_from(
1379 self._working_directory(), self._pidfile_name())
1380 _drone_manager.register_pidfile(pidfile_id)
1381
1382 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1383 if paired_pidfile_id:
1384 _drone_manager.register_pidfile(paired_pidfile_id)
1385
1386
1387 def recover(self):
1388 if not self._check_paired_results_exist():
1389 return
1390
1391 self._create_monitor()
1392 self.monitor.attach_to_existing_process(
1393 self._working_directory(), pidfile_name=self._pidfile_name(),
1394 num_processes=self.num_processes)
1395 if not self.monitor.has_process():
1396 # no process to recover; wait to be started normally
1397 self.monitor = None
1398 return
1399
1400 self.started = True
1401 logging.info('Recovering process %s for %s at %s'
1402 % (self.monitor.get_process(), type(self).__name__,
1403 self._working_directory()))
mbligh36768f02008-02-22 18:28:33 +00001404
1405
mbligh4608b002010-01-05 18:22:35 +00001406 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1407 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001408 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001409 for entry in queue_entries:
1410 if entry.status not in allowed_hqe_statuses:
Dale Curtisaa513362011-03-01 17:27:44 -08001411 raise host_scheduler.SchedulerError(
1412 '%s attempting to start entry with invalid status %s: '
1413 '%s' % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001414 invalid_host_status = (
1415 allowed_host_statuses is not None
1416 and entry.host.status not in allowed_host_statuses)
1417 if invalid_host_status:
Dale Curtisaa513362011-03-01 17:27:44 -08001418 raise host_scheduler.SchedulerError(
1419 '%s attempting to start on queue entry with invalid '
1420 'host status %s: %s'
1421 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001422
1423
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001424SiteAgentTask = utils.import_site_class(
1425 __file__, 'autotest_lib.scheduler.site_monitor_db',
1426 'SiteAgentTask', BaseAgentTask)
1427
1428class AgentTask(SiteAgentTask):
1429 pass
1430
1431
showardd9205182009-04-27 20:09:55 +00001432class TaskWithJobKeyvals(object):
1433 """AgentTask mixin providing functionality to help with job keyval files."""
1434 _KEYVAL_FILE = 'keyval'
1435 def _format_keyval(self, key, value):
1436 return '%s=%s' % (key, value)
1437
1438
1439 def _keyval_path(self):
1440 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001441 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001442
1443
1444 def _write_keyval_after_job(self, field, value):
1445 assert self.monitor
1446 if not self.monitor.has_process():
1447 return
1448 _drone_manager.write_lines_to_file(
1449 self._keyval_path(), [self._format_keyval(field, value)],
1450 paired_with_process=self.monitor.get_process())
1451
1452
1453 def _job_queued_keyval(self, job):
1454 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1455
1456
1457 def _write_job_finished(self):
1458 self._write_keyval_after_job("job_finished", int(time.time()))
1459
1460
showarddb502762009-09-09 15:31:20 +00001461 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1462 keyval_contents = '\n'.join(self._format_keyval(key, value)
1463 for key, value in keyval_dict.iteritems())
1464 # always end with a newline to allow additional keyvals to be written
1465 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001466 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001467 keyval_contents,
1468 file_path=keyval_path)
1469
1470
1471 def _write_keyvals_before_job(self, keyval_dict):
1472 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1473
1474
1475 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001476 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001477 host.hostname)
1478 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001479 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001480 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1481 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1482
1483
showard8cc058f2009-09-08 16:26:33 +00001484class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001485 """
1486 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1487 """
1488
1489 TASK_TYPE = None
1490 host = None
1491 queue_entry = None
1492
showardd1195652009-12-08 22:21:02 +00001493 def __init__(self, task, extra_command_args):
1494 super(SpecialAgentTask, self).__init__()
1495
lmrb7c5d272010-04-16 06:34:04 +00001496 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001497
jamesrenc44ae992010-02-19 00:12:54 +00001498 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001499 self.queue_entry = None
1500 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001501 self.queue_entry = scheduler_models.HostQueueEntry(
1502 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001503
showarded2afea2009-07-07 20:54:07 +00001504 self.task = task
1505 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001506
1507
showard8cc058f2009-09-08 16:26:33 +00001508 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001509 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1510
1511
1512 def _command_line(self):
1513 return _autoserv_command_line(self.host.hostname,
1514 self._extra_command_args,
1515 queue_entry=self.queue_entry)
1516
1517
1518 def _working_directory(self):
1519 return self.task.execution_path()
1520
1521
1522 @property
1523 def owner_username(self):
1524 if self.task.requested_by:
1525 return self.task.requested_by.login
1526 return None
showard8cc058f2009-09-08 16:26:33 +00001527
1528
showarded2afea2009-07-07 20:54:07 +00001529 def prolog(self):
1530 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001531 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001532 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001533
1534
showardde634ee2009-01-30 01:44:24 +00001535 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001536 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001537
showard2fe3f1d2009-07-06 20:19:11 +00001538 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001539 return # don't fail metahost entries, they'll be reassigned
1540
showard2fe3f1d2009-07-06 20:19:11 +00001541 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001542 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001543 return # entry has been aborted
1544
showard2fe3f1d2009-07-06 20:19:11 +00001545 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001546 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001547 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001548 self._write_keyval_after_job(queued_key, queued_time)
1549 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001550
showard8cc058f2009-09-08 16:26:33 +00001551 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001552 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001553 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001554 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001555
showard8cc058f2009-09-08 16:26:33 +00001556 pidfile_id = _drone_manager.get_pidfile_id_from(
1557 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001558 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001559 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001560
1561 if self.queue_entry.job.parse_failed_repair:
1562 self._parse_results([self.queue_entry])
1563 else:
1564 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001565
1566
1567 def cleanup(self):
1568 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001569
1570 # We will consider an aborted task to be "Failed"
1571 self.task.finish(bool(self.success))
1572
showardf85a0b72009-10-07 20:48:45 +00001573 if self.monitor:
1574 if self.monitor.has_process():
1575 self._copy_results([self.task])
1576 if self.monitor.pidfile_id is not None:
1577 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001578
1579
1580class RepairTask(SpecialAgentTask):
1581 TASK_TYPE = models.SpecialTask.Task.REPAIR
1582
1583
showardd1195652009-12-08 22:21:02 +00001584 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001585 """\
1586 queue_entry: queue entry to mark failed if this repair fails.
1587 """
1588 protection = host_protections.Protection.get_string(
1589 task.host.protection)
1590 # normalize the protection name
1591 protection = host_protections.Protection.get_attr_name(protection)
1592
1593 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001594 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001595
1596 # *don't* include the queue entry in IDs -- if the queue entry is
1597 # aborted, we want to leave the repair task running
1598 self._set_ids(host=self.host)
1599
1600
1601 def prolog(self):
1602 super(RepairTask, self).prolog()
1603 logging.info("repair_task starting")
1604 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001605
1606
jadmanski0afbb632008-06-06 21:10:57 +00001607 def epilog(self):
1608 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001609
jadmanski0afbb632008-06-06 21:10:57 +00001610 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001611 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001612 else:
showard8cc058f2009-09-08 16:26:33 +00001613 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001614 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001615 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001616
1617
showarded2afea2009-07-07 20:54:07 +00001618class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001619 def _copy_to_results_repository(self):
1620 if not self.queue_entry or self.queue_entry.meta_host:
1621 return
1622
1623 self.queue_entry.set_execution_subdir()
1624 log_name = os.path.basename(self.task.execution_path())
1625 source = os.path.join(self.task.execution_path(), 'debug',
1626 'autoserv.DEBUG')
1627 destination = os.path.join(
1628 self.queue_entry.execution_path(), log_name)
1629
1630 self.monitor.try_copy_to_results_repository(
1631 source, destination_path=destination)
1632
1633
showard170873e2009-01-07 00:22:26 +00001634 def epilog(self):
1635 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001636
showard775300b2009-09-09 15:30:50 +00001637 if self.success:
1638 return
showard8fe93b52008-11-18 17:53:22 +00001639
showard775300b2009-09-09 15:30:50 +00001640 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001641
showard775300b2009-09-09 15:30:50 +00001642 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001643 # effectively ignore failure for these hosts
1644 self.success = True
showard775300b2009-09-09 15:30:50 +00001645 return
1646
1647 if self.queue_entry:
1648 self.queue_entry.requeue()
1649
1650 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001651 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001652 queue_entry__id=self.queue_entry.id):
1653 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1654 self._fail_queue_entry()
1655 return
1656
showard9bb960b2009-11-19 01:02:11 +00001657 queue_entry = models.HostQueueEntry.objects.get(
1658 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001659 else:
1660 queue_entry = None
1661
1662 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001663 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001664 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001665 queue_entry=queue_entry,
1666 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001667
showard8fe93b52008-11-18 17:53:22 +00001668
1669class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001670 TASK_TYPE = models.SpecialTask.Task.VERIFY
1671
1672
showardd1195652009-12-08 22:21:02 +00001673 def __init__(self, task):
1674 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001675 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001676
1677
jadmanski0afbb632008-06-06 21:10:57 +00001678 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001679 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001680
showardb18134f2009-03-20 20:52:18 +00001681 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001682 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001683 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1684 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001685
jamesren42318f72010-05-10 23:40:59 +00001686 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00001687 # and there's no need to keep records of other requests.
1688 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001689 host__id=self.host.id,
1690 task=models.SpecialTask.Task.VERIFY,
jamesren42318f72010-05-10 23:40:59 +00001691 is_active=False, is_complete=False, queue_entry=None)
showarded2afea2009-07-07 20:54:07 +00001692 queued_verifies = queued_verifies.exclude(id=self.task.id)
1693 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001694
mbligh36768f02008-02-22 18:28:33 +00001695
jadmanski0afbb632008-06-06 21:10:57 +00001696 def epilog(self):
1697 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001698 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001699 if self.queue_entry:
1700 self.queue_entry.on_pending()
1701 else:
1702 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001703
1704
mbligh4608b002010-01-05 18:22:35 +00001705class CleanupTask(PreJobTask):
1706 # note this can also run post-job, but when it does, it's running standalone
1707 # against the host (not related to the job), so it's not considered a
1708 # PostJobTask
1709
1710 TASK_TYPE = models.SpecialTask.Task.CLEANUP
1711
1712
1713 def __init__(self, task, recover_run_monitor=None):
1714 super(CleanupTask, self).__init__(task, ['--cleanup'])
1715 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1716
1717
1718 def prolog(self):
1719 super(CleanupTask, self).prolog()
1720 logging.info("starting cleanup task for host: %s", self.host.hostname)
1721 self.host.set_status(models.Host.Status.CLEANING)
1722 if self.queue_entry:
1723 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1724
1725
1726 def _finish_epilog(self):
1727 if not self.queue_entry or not self.success:
1728 return
1729
1730 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
1731 should_run_verify = (
1732 self.queue_entry.job.run_verify
1733 and self.host.protection != do_not_verify_protection)
1734 if should_run_verify:
1735 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1736 models.SpecialTask.objects.create(
1737 host=models.Host.objects.get(id=self.host.id),
1738 queue_entry=entry,
1739 task=models.SpecialTask.Task.VERIFY)
1740 else:
1741 self.queue_entry.on_pending()
1742
1743
1744 def epilog(self):
1745 super(CleanupTask, self).epilog()
1746
1747 if self.success:
1748 self.host.update_field('dirty', 0)
1749 self.host.set_status(models.Host.Status.READY)
1750
1751 self._finish_epilog()
1752
1753
showarda9545c02009-12-18 22:44:26 +00001754class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
1755 """
1756 Common functionality for QueueTask and HostlessQueueTask
1757 """
1758 def __init__(self, queue_entries):
1759 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001760 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001761 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001762
1763
showard73ec0442009-02-07 02:05:20 +00001764 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001765 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001766
1767
jamesrenc44ae992010-02-19 00:12:54 +00001768 def _write_control_file(self, execution_path):
1769 control_path = _drone_manager.attach_file_to_execution(
1770 execution_path, self.job.control_file)
1771 return control_path
1772
1773
showardd1195652009-12-08 22:21:02 +00001774 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001775 execution_path = self.queue_entries[0].execution_path()
1776 control_path = self._write_control_file(execution_path)
1777 hostnames = ','.join(entry.host.hostname
1778 for entry in self.queue_entries
1779 if not entry.is_hostless())
1780
1781 execution_tag = self.queue_entries[0].execution_tag()
1782 params = _autoserv_command_line(
1783 hostnames,
1784 ['-P', execution_tag, '-n',
1785 _drone_manager.absolute_path(control_path)],
1786 job=self.job, verbose=False)
1787
1788 if not self.job.is_server_job():
1789 params.append('-c')
1790
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001791 if self.job.is_image_update_job():
1792 params += ['--image', self.job.update_image_path]
1793
jamesrenc44ae992010-02-19 00:12:54 +00001794 return params
showardd1195652009-12-08 22:21:02 +00001795
1796
1797 @property
1798 def num_processes(self):
1799 return len(self.queue_entries)
1800
1801
1802 @property
1803 def owner_username(self):
1804 return self.job.owner
1805
1806
1807 def _working_directory(self):
1808 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001809
1810
jadmanski0afbb632008-06-06 21:10:57 +00001811 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001812 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001813 keyval_dict = self.job.keyval_dict()
1814 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001815 group_name = self.queue_entries[0].get_group_name()
1816 if group_name:
1817 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001818 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001819 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001820 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001821 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001822
1823
showard35162b02009-03-03 02:17:30 +00001824 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001825 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001826 _drone_manager.write_lines_to_file(error_file_path,
1827 [_LOST_PROCESS_ERROR])
1828
1829
showardd3dc1992009-04-22 21:01:40 +00001830 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001831 if not self.monitor:
1832 return
1833
showardd9205182009-04-27 20:09:55 +00001834 self._write_job_finished()
1835
showard35162b02009-03-03 02:17:30 +00001836 if self.monitor.lost_process:
1837 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001838
jadmanskif7fa2cc2008-10-01 14:13:23 +00001839
showardcbd74612008-11-19 21:42:02 +00001840 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001841 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001842 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001843 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001844 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001845
1846
jadmanskif7fa2cc2008-10-01 14:13:23 +00001847 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001848 if not self.monitor or not self.monitor.has_process():
1849 return
1850
jadmanskif7fa2cc2008-10-01 14:13:23 +00001851 # build up sets of all the aborted_by and aborted_on values
1852 aborted_by, aborted_on = set(), set()
1853 for queue_entry in self.queue_entries:
1854 if queue_entry.aborted_by:
1855 aborted_by.add(queue_entry.aborted_by)
1856 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1857 aborted_on.add(t)
1858
1859 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001860 # TODO(showard): this conditional is now obsolete, we just need to leave
1861 # it in temporarily for backwards compatibility over upgrades. delete
1862 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001863 assert len(aborted_by) <= 1
1864 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001865 aborted_by_value = aborted_by.pop()
1866 aborted_on_value = max(aborted_on)
1867 else:
1868 aborted_by_value = 'autotest_system'
1869 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001870
showarda0382352009-02-11 23:36:43 +00001871 self._write_keyval_after_job("aborted_by", aborted_by_value)
1872 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001873
showardcbd74612008-11-19 21:42:02 +00001874 aborted_on_string = str(datetime.datetime.fromtimestamp(
1875 aborted_on_value))
1876 self._write_status_comment('Job aborted by %s on %s' %
1877 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001878
1879
jadmanski0afbb632008-06-06 21:10:57 +00001880 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001881 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001882 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001883 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001884
1885
jadmanski0afbb632008-06-06 21:10:57 +00001886 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001887 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001888 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001889
1890
1891class QueueTask(AbstractQueueTask):
1892 def __init__(self, queue_entries):
1893 super(QueueTask, self).__init__(queue_entries)
1894 self._set_ids(queue_entries=queue_entries)
1895
1896
1897 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001898 self._check_queue_entry_statuses(
1899 self.queue_entries,
1900 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1901 models.HostQueueEntry.Status.RUNNING),
1902 allowed_host_statuses=(models.Host.Status.PENDING,
1903 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001904
1905 super(QueueTask, self).prolog()
1906
1907 for queue_entry in self.queue_entries:
1908 self._write_host_keyvals(queue_entry.host)
1909 queue_entry.host.set_status(models.Host.Status.RUNNING)
1910 queue_entry.host.update_field('dirty', 1)
1911 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1912 # TODO(gps): Remove this if nothing needs it anymore.
1913 # A potential user is: tko/parser
1914 self.job.write_to_machines_file(self.queue_entries[0])
1915
1916
1917 def _finish_task(self):
1918 super(QueueTask, self)._finish_task()
1919
1920 for queue_entry in self.queue_entries:
1921 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001922 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001923
1924
mbligh4608b002010-01-05 18:22:35 +00001925class HostlessQueueTask(AbstractQueueTask):
1926 def __init__(self, queue_entry):
1927 super(HostlessQueueTask, self).__init__([queue_entry])
1928 self.queue_entry_ids = [queue_entry.id]
1929
1930
1931 def prolog(self):
1932 self.queue_entries[0].update_field('execution_subdir', 'hostless')
1933 super(HostlessQueueTask, self).prolog()
1934
1935
mbligh4608b002010-01-05 18:22:35 +00001936 def _finish_task(self):
1937 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00001938 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001939
1940
showardd3dc1992009-04-22 21:01:40 +00001941class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00001942 def __init__(self, queue_entries, log_file_name):
1943 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00001944
showardd1195652009-12-08 22:21:02 +00001945 self.queue_entries = queue_entries
1946
showardd3dc1992009-04-22 21:01:40 +00001947 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00001948 self._autoserv_monitor.attach_to_existing_process(
1949 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00001950
showardd1195652009-12-08 22:21:02 +00001951
1952 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00001953 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00001954 return 'true'
1955 return self._generate_command(
1956 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00001957
1958
1959 def _generate_command(self, results_dir):
1960 raise NotImplementedError('Subclasses must override this')
1961
1962
showardd1195652009-12-08 22:21:02 +00001963 @property
1964 def owner_username(self):
1965 return self.queue_entries[0].job.owner
1966
1967
1968 def _working_directory(self):
1969 return self._get_consistent_execution_path(self.queue_entries)
1970
1971
1972 def _paired_with_monitor(self):
1973 return self._autoserv_monitor
1974
1975
showardd3dc1992009-04-22 21:01:40 +00001976 def _job_was_aborted(self):
1977 was_aborted = None
showardd1195652009-12-08 22:21:02 +00001978 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00001979 queue_entry.update_from_database()
1980 if was_aborted is None: # first queue entry
1981 was_aborted = bool(queue_entry.aborted)
1982 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00001983 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
1984 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00001985 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00001986 'Inconsistent abort state',
1987 'Queue entries have inconsistent abort state:\n' +
1988 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00001989 # don't crash here, just assume true
1990 return True
1991 return was_aborted
1992
1993
showardd1195652009-12-08 22:21:02 +00001994 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00001995 if self._job_was_aborted():
1996 return models.HostQueueEntry.Status.ABORTED
1997
1998 # we'll use a PidfileRunMonitor to read the autoserv exit status
1999 if self._autoserv_monitor.exit_code() == 0:
2000 return models.HostQueueEntry.Status.COMPLETED
2001 return models.HostQueueEntry.Status.FAILED
2002
2003
showardd3dc1992009-04-22 21:01:40 +00002004 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002005 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002006 queue_entry.set_status(status)
2007
2008
2009 def abort(self):
2010 # override AgentTask.abort() to avoid killing the process and ending
2011 # the task. post-job tasks continue when the job is aborted.
2012 pass
2013
2014
mbligh4608b002010-01-05 18:22:35 +00002015 def _pidfile_label(self):
2016 # '.autoserv_execute' -> 'autoserv'
2017 return self._pidfile_name()[1:-len('_execute')]
2018
2019
showard9bb960b2009-11-19 01:02:11 +00002020class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002021 """
2022 Task responsible for
2023 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2024 * copying logs to the results repository
2025 * spawning CleanupTasks for hosts, if necessary
2026 * spawning a FinalReparseTask for the job
2027 """
showardd1195652009-12-08 22:21:02 +00002028 def __init__(self, queue_entries, recover_run_monitor=None):
2029 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002030 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002031 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002032 self._set_ids(queue_entries=queue_entries)
2033
2034
2035 def _generate_command(self, results_dir):
2036 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002037 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002038 return [_autoserv_path , '-p',
2039 '--pidfile-label=%s' % self._pidfile_label(),
2040 '--use-existing-results', '--collect-crashinfo',
2041 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002042
2043
showardd1195652009-12-08 22:21:02 +00002044 @property
2045 def num_processes(self):
2046 return len(self.queue_entries)
2047
2048
2049 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002050 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002051
2052
showardd3dc1992009-04-22 21:01:40 +00002053 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002054 self._check_queue_entry_statuses(
2055 self.queue_entries,
2056 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2057 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002058
showardd3dc1992009-04-22 21:01:40 +00002059 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002060
2061
showardd3dc1992009-04-22 21:01:40 +00002062 def epilog(self):
2063 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002064 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002065 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002066
showard9bb960b2009-11-19 01:02:11 +00002067
2068 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002069 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002070 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002071 models.HostQueueEntry.Status.COMPLETED)
2072 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2073 else:
2074 final_success = False
2075 num_tests_failed = 0
2076
showard9bb960b2009-11-19 01:02:11 +00002077 reboot_after = self._job.reboot_after
2078 do_reboot = (
2079 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002080 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002081 or reboot_after == model_attributes.RebootAfter.ALWAYS
2082 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
showard9bb960b2009-11-19 01:02:11 +00002083 and final_success and num_tests_failed == 0))
2084
showardd1195652009-12-08 22:21:02 +00002085 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002086 if do_reboot:
2087 # don't pass the queue entry to the CleanupTask. if the cleanup
2088 # fails, the job doesn't care -- it's over.
2089 models.SpecialTask.objects.create(
2090 host=models.Host.objects.get(id=queue_entry.host.id),
2091 task=models.SpecialTask.Task.CLEANUP,
2092 requested_by=self._job.owner_model())
2093 else:
2094 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002095
2096
showard0bbfc212009-04-29 21:06:13 +00002097 def run(self):
showard597bfd32009-05-08 18:22:50 +00002098 autoserv_exit_code = self._autoserv_monitor.exit_code()
2099 # only run if Autoserv exited due to some signal. if we have no exit
2100 # code, assume something bad (and signal-like) happened.
2101 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002102 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002103 else:
2104 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002105
2106
mbligh4608b002010-01-05 18:22:35 +00002107class SelfThrottledPostJobTask(PostJobTask):
2108 """
2109 Special AgentTask subclass that maintains its own global process limit.
2110 """
2111 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002112
2113
mbligh4608b002010-01-05 18:22:35 +00002114 @classmethod
2115 def _increment_running_processes(cls):
2116 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002117
mblighd5c95802008-03-05 00:33:46 +00002118
mbligh4608b002010-01-05 18:22:35 +00002119 @classmethod
2120 def _decrement_running_processes(cls):
2121 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002122
2123
mbligh4608b002010-01-05 18:22:35 +00002124 @classmethod
2125 def _max_processes(cls):
2126 raise NotImplementedError
2127
2128
2129 @classmethod
2130 def _can_run_new_process(cls):
2131 return cls._num_running_processes < cls._max_processes()
2132
2133
2134 def _process_started(self):
2135 return bool(self.monitor)
2136
2137
2138 def tick(self):
2139 # override tick to keep trying to start until the process count goes
2140 # down and we can, at which point we revert to default behavior
2141 if self._process_started():
2142 super(SelfThrottledPostJobTask, self).tick()
2143 else:
2144 self._try_starting_process()
2145
2146
2147 def run(self):
2148 # override run() to not actually run unless we can
2149 self._try_starting_process()
2150
2151
2152 def _try_starting_process(self):
2153 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002154 return
2155
mbligh4608b002010-01-05 18:22:35 +00002156 # actually run the command
2157 super(SelfThrottledPostJobTask, self).run()
jamesren25663562010-04-27 18:00:55 +00002158 if self._process_started():
2159 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002160
mblighd5c95802008-03-05 00:33:46 +00002161
mbligh4608b002010-01-05 18:22:35 +00002162 def finished(self, success):
2163 super(SelfThrottledPostJobTask, self).finished(success)
2164 if self._process_started():
2165 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002166
showard21baa452008-10-21 00:08:39 +00002167
mbligh4608b002010-01-05 18:22:35 +00002168class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002169 def __init__(self, queue_entries):
2170 super(FinalReparseTask, self).__init__(queue_entries,
2171 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002172 # don't use _set_ids, since we don't want to set the host_ids
2173 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002174
2175
2176 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002177 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002178 results_dir]
2179
2180
2181 @property
2182 def num_processes(self):
2183 return 0 # don't include parser processes in accounting
2184
2185
2186 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002187 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002188
2189
showard97aed502008-11-04 02:01:24 +00002190 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002191 def _max_processes(cls):
2192 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002193
2194
2195 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002196 self._check_queue_entry_statuses(
2197 self.queue_entries,
2198 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002199
showard97aed502008-11-04 02:01:24 +00002200 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002201
2202
2203 def epilog(self):
2204 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002205 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002206
2207
mbligh4608b002010-01-05 18:22:35 +00002208class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002209 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2210
mbligh4608b002010-01-05 18:22:35 +00002211 def __init__(self, queue_entries):
2212 super(ArchiveResultsTask, self).__init__(queue_entries,
2213 log_file_name='.archiving.log')
2214 # don't use _set_ids, since we don't want to set the host_ids
2215 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002216
2217
mbligh4608b002010-01-05 18:22:35 +00002218 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002219 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002220
2221
mbligh4608b002010-01-05 18:22:35 +00002222 def _generate_command(self, results_dir):
2223 return [_autoserv_path , '-p',
2224 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002225 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002226 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2227 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002228
2229
mbligh4608b002010-01-05 18:22:35 +00002230 @classmethod
2231 def _max_processes(cls):
2232 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002233
2234
2235 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002236 self._check_queue_entry_statuses(
2237 self.queue_entries,
2238 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2239
2240 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002241
2242
mbligh4608b002010-01-05 18:22:35 +00002243 def epilog(self):
2244 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002245 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002246 failed_file = os.path.join(self._working_directory(),
2247 self._ARCHIVING_FAILED_FILE)
2248 paired_process = self._paired_with_monitor().get_process()
2249 _drone_manager.write_lines_to_file(
2250 failed_file, ['Archiving failed with exit code %s'
2251 % self.monitor.exit_code()],
2252 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002253 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002254
2255
mbligh36768f02008-02-22 18:28:33 +00002256if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002257 main()