blob: f5a55ad82fd7a17c059b791cf5bc2d43001535b6 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard402934a2009-12-21 22:20:47 +00008import common
showardef519212009-05-08 02:29:53 +00009import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
Eric Li6f27d4f2010-09-29 10:55:17 -070010import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback, urllib
showardf13a9e22009-12-18 22:54:09 +000011import itertools, logging, weakref, gc
showard402934a2009-12-21 22:20:47 +000012
mbligh8bcd23a2009-02-03 19:14:06 +000013import MySQLdb
showard402934a2009-12-21 22:20:47 +000014
showard043c62a2009-06-10 19:48:57 +000015from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
showard136e6dc2009-06-10 19:38:49 +000020from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000021from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000022from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000023from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
jamesrendd855242010-03-02 22:23:44 +000024from autotest_lib.frontend.afe import model_attributes
showard170873e2009-01-07 00:22:26 +000025from autotest_lib.scheduler import drone_manager, drones, email_manager
Dale Curtisaa513362011-03-01 17:27:44 -080026from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000027from autotest_lib.scheduler import status_server, scheduler_config
jamesrenc44ae992010-02-19 00:12:54 +000028from autotest_lib.scheduler import scheduler_models
showard549afad2009-08-20 23:33:36 +000029BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
30PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000031
mbligh36768f02008-02-22 18:28:33 +000032RESULTS_DIR = '.'
33AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000034DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000035AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
36
37if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000038 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000039AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
40AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
41
42if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000043 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000044
showard35162b02009-03-03 02:17:30 +000045# error message to leave in results dir when an autoserv process disappears
46# mysteriously
47_LOST_PROCESS_ERROR = """\
48Autoserv failed abnormally during execution for this job, probably due to a
49system error on the Autotest server. Full results may not be available. Sorry.
50"""
51
mbligh6f8bab42008-02-29 22:45:14 +000052_db = None
mbligh36768f02008-02-22 18:28:33 +000053_shutdown = False
showard170873e2009-01-07 00:22:26 +000054_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000055_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000056_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000057
Eric Lie0493a42010-11-15 13:05:43 -080058def _parser_path_default(install_dir):
59 return os.path.join(install_dir, 'tko', 'parse')
60_parser_path_func = utils.import_site_function(
61 __file__, 'autotest_lib.scheduler.site_monitor_db',
62 'parser_path', _parser_path_default)
63_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
64
mbligh36768f02008-02-22 18:28:33 +000065
showardec6a3b92009-09-25 20:29:13 +000066def _get_pidfile_timeout_secs():
67 """@returns How long to wait for autoserv to write pidfile."""
68 pidfile_timeout_mins = global_config.global_config.get_config_value(
69 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
70 return pidfile_timeout_mins * 60
71
72
mbligh83c1e9e2009-05-01 23:10:41 +000073def _site_init_monitor_db_dummy():
74 return {}
75
76
jamesren76fcf192010-04-21 20:39:50 +000077def _verify_default_drone_set_exists():
78 if (models.DroneSet.drone_sets_enabled() and
79 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080080 raise host_scheduler.SchedulerError(
81 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000082
83
84def _sanity_check():
85 """Make sure the configs are consistent before starting the scheduler"""
86 _verify_default_drone_set_exists()
87
88
mbligh36768f02008-02-22 18:28:33 +000089def main():
showard27f33872009-04-07 18:20:53 +000090 try:
showard549afad2009-08-20 23:33:36 +000091 try:
92 main_without_exception_handling()
93 except SystemExit:
94 raise
95 except:
96 logging.exception('Exception escaping in monitor_db')
97 raise
98 finally:
99 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000100
101
102def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000103 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000104
showard136e6dc2009-06-10 19:38:49 +0000105 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000106 parser = optparse.OptionParser(usage)
107 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
108 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000109 parser.add_option('--test', help='Indicate that scheduler is under ' +
110 'test and should use dummy autoserv and no parsing',
111 action='store_true')
112 (options, args) = parser.parse_args()
113 if len(args) != 1:
114 parser.print_usage()
115 return
mbligh36768f02008-02-22 18:28:33 +0000116
showard5613c662009-06-08 23:30:33 +0000117 scheduler_enabled = global_config.global_config.get_config_value(
118 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
119
120 if not scheduler_enabled:
121 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
122 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000123 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000124 sys.exit(1)
125
jadmanski0afbb632008-06-06 21:10:57 +0000126 global RESULTS_DIR
127 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000128
mbligh83c1e9e2009-05-01 23:10:41 +0000129 site_init = utils.import_site_function(__file__,
130 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
131 _site_init_monitor_db_dummy)
132 site_init()
133
showardcca334f2009-03-12 20:38:34 +0000134 # Change the cwd while running to avoid issues incase we were launched from
135 # somewhere odd (such as a random NFS home directory of the person running
136 # sudo to launch us as the appropriate user).
137 os.chdir(RESULTS_DIR)
138
jamesrenc7d387e2010-08-10 21:48:30 +0000139 # This is helpful for debugging why stuff a scheduler launches is
140 # misbehaving.
141 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000142
jadmanski0afbb632008-06-06 21:10:57 +0000143 if options.test:
144 global _autoserv_path
145 _autoserv_path = 'autoserv_dummy'
146 global _testing_mode
147 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000148
jamesrenc44ae992010-02-19 00:12:54 +0000149 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000150 server.start()
151
jadmanski0afbb632008-06-06 21:10:57 +0000152 try:
jamesrenc44ae992010-02-19 00:12:54 +0000153 initialize()
showardc5afc462009-01-13 00:09:39 +0000154 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000155 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000156
Eric Lia82dc352011-02-23 13:15:52 -0800157 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000158 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000159 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000160 except:
showard170873e2009-01-07 00:22:26 +0000161 email_manager.manager.log_stacktrace(
162 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000163
showard170873e2009-01-07 00:22:26 +0000164 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000165 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000166 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000167 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000168
169
showard136e6dc2009-06-10 19:38:49 +0000170def setup_logging():
171 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
172 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
173 logging_manager.configure_logging(
174 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
175 logfile_name=log_name)
176
177
mbligh36768f02008-02-22 18:28:33 +0000178def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000179 global _shutdown
180 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000181 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000182
183
jamesrenc44ae992010-02-19 00:12:54 +0000184def initialize():
showardb18134f2009-03-20 20:52:18 +0000185 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
186 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000187
showard8de37132009-08-31 18:33:08 +0000188 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000189 logging.critical("monitor_db already running, aborting!")
190 sys.exit(1)
191 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000192
showardb1e51872008-10-07 11:08:18 +0000193 if _testing_mode:
194 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000195 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000196
jadmanski0afbb632008-06-06 21:10:57 +0000197 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
198 global _db
showard170873e2009-01-07 00:22:26 +0000199 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000200 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000201
showardfa8629c2008-11-04 16:51:23 +0000202 # ensure Django connection is in autocommit
203 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000204 # bypass the readonly connection
205 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000206
showardb18134f2009-03-20 20:52:18 +0000207 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000208 signal.signal(signal.SIGINT, handle_sigint)
209
jamesrenc44ae992010-02-19 00:12:54 +0000210 initialize_globals()
211 scheduler_models.initialize()
212
showardd1ee1dd2009-01-07 21:33:08 +0000213 drones = global_config.global_config.get_config_value(
214 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
215 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000216 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000217 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000218 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
219
showardb18134f2009-03-20 20:52:18 +0000220 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000221
222
jamesrenc44ae992010-02-19 00:12:54 +0000223def initialize_globals():
224 global _drone_manager
225 _drone_manager = drone_manager.instance()
226
227
showarded2afea2009-07-07 20:54:07 +0000228def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
229 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000230 """
231 @returns The autoserv command line as a list of executable + parameters.
232
233 @param machines - string - A machine or comma separated list of machines
234 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000235 @param extra_args - list - Additional arguments to pass to autoserv.
236 @param job - Job object - If supplied, -u owner and -l name parameters
237 will be added.
238 @param queue_entry - A HostQueueEntry object - If supplied and no Job
239 object was supplied, this will be used to lookup the Job object.
240 """
showarda9545c02009-12-18 22:44:26 +0000241 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000242 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000243 if machines:
244 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000245 if job or queue_entry:
246 if not job:
247 job = queue_entry.job
248 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000249 if verbose:
250 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000251 return autoserv_argv + extra_args
252
253
Simran Basia858a232012-08-21 11:04:37 -0700254class BaseDispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000255 def __init__(self):
256 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000257 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800258 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000259 user_cleanup_time = scheduler_config.config.clean_interval
260 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
261 _db, user_cleanup_time)
262 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000263 self._host_agents = {}
264 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000265 self._tick_count = 0
266 self._last_garbage_stats_time = time.time()
267 self._seconds_between_garbage_stats = 60 * (
268 global_config.global_config.get_config_value(
269 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700270 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700271 self._tick_debug = global_config.global_config.get_config_value(
272 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
273 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700274 self._extra_debugging = global_config.global_config.get_config_value(
275 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
276 default=False)
mbligh36768f02008-02-22 18:28:33 +0000277
mbligh36768f02008-02-22 18:28:33 +0000278
showard915958d2009-04-22 21:00:58 +0000279 def initialize(self, recover_hosts=True):
280 self._periodic_cleanup.initialize()
281 self._24hr_upkeep.initialize()
282
jadmanski0afbb632008-06-06 21:10:57 +0000283 # always recover processes
284 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000285
jadmanski0afbb632008-06-06 21:10:57 +0000286 if recover_hosts:
287 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000288
jamesrenc44ae992010-02-19 00:12:54 +0000289 self._host_scheduler.recovery_on_startup()
290
mbligh36768f02008-02-22 18:28:33 +0000291
Simran Basi0ec94dd2012-08-28 09:50:10 -0700292 def _log_tick_msg(self, msg):
293 if self._tick_debug:
294 logging.debug(msg)
295
296
Simran Basidef92872012-09-20 13:34:34 -0700297 def _log_extra_msg(self, msg):
298 if self._extra_debugging:
299 logging.debug(msg)
300
301
jadmanski0afbb632008-06-06 21:10:57 +0000302 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700303 """
304 This is an altered version of tick() where we keep track of when each
305 major step begins so we can try to figure out where we are using most
306 of the tick time.
307 """
Simran Basi3f6717d2012-09-13 15:21:22 -0700308 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000309 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700310 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000311 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700312 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000313 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700314 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000315 self._find_aborting()
Simran Basi3f6717d2012-09-13 15:21:22 -0700316 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000317 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700318 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000319 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700320 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000321 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700322 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000323 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700324 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000325 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700326 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000327 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700328 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000329 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700330 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000331 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700332 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700333 'email_manager.manager.send_queued_emails().')
showard170873e2009-01-07 00:22:26 +0000334 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700335 self._log_tick_msg('Calling django.db.reset_queries().')
showard402934a2009-12-21 22:20:47 +0000336 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000337 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000338
showard97aed502008-11-04 02:01:24 +0000339
mblighf3294cc2009-04-08 21:17:38 +0000340 def _run_cleanup(self):
341 self._periodic_cleanup.run_cleanup_maybe()
342 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000343
mbligh36768f02008-02-22 18:28:33 +0000344
showardf13a9e22009-12-18 22:54:09 +0000345 def _garbage_collection(self):
346 threshold_time = time.time() - self._seconds_between_garbage_stats
347 if threshold_time < self._last_garbage_stats_time:
348 # Don't generate these reports very often.
349 return
350
351 self._last_garbage_stats_time = time.time()
352 # Force a full level 0 collection (because we can, it doesn't hurt
353 # at this interval).
354 gc.collect()
355 logging.info('Logging garbage collector stats on tick %d.',
356 self._tick_count)
357 gc_stats._log_garbage_collector_stats()
358
359
showard170873e2009-01-07 00:22:26 +0000360 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
361 for object_id in object_ids:
362 agent_dict.setdefault(object_id, set()).add(agent)
363
364
365 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
366 for object_id in object_ids:
367 assert object_id in agent_dict
368 agent_dict[object_id].remove(agent)
369
370
showardd1195652009-12-08 22:21:02 +0000371 def add_agent_task(self, agent_task):
372 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000373 self._agents.append(agent)
374 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000375 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
376 self._register_agent_for_ids(self._queue_entry_agents,
377 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000378
showard170873e2009-01-07 00:22:26 +0000379
380 def get_agents_for_entry(self, queue_entry):
381 """
382 Find agents corresponding to the specified queue_entry.
383 """
showardd3dc1992009-04-22 21:01:40 +0000384 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000385
386
387 def host_has_agent(self, host):
388 """
389 Determine if there is currently an Agent present using this host.
390 """
391 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000392
393
jadmanski0afbb632008-06-06 21:10:57 +0000394 def remove_agent(self, agent):
395 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000396 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
397 agent)
398 self._unregister_agent_for_ids(self._queue_entry_agents,
399 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000400
401
showard8cc058f2009-09-08 16:26:33 +0000402 def _host_has_scheduled_special_task(self, host):
403 return bool(models.SpecialTask.objects.filter(host__id=host.id,
404 is_active=False,
405 is_complete=False))
406
407
jadmanski0afbb632008-06-06 21:10:57 +0000408 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000409 agent_tasks = self._create_recovery_agent_tasks()
410 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000411 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000412 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000413 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000414 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000415 self._reverify_remaining_hosts()
416 # reinitialize drones after killing orphaned processes, since they can
417 # leave around files when they die
418 _drone_manager.execute_actions()
419 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000420
showard170873e2009-01-07 00:22:26 +0000421
showardd1195652009-12-08 22:21:02 +0000422 def _create_recovery_agent_tasks(self):
423 return (self._get_queue_entry_agent_tasks()
424 + self._get_special_task_agent_tasks(is_active=True))
425
426
427 def _get_queue_entry_agent_tasks(self):
428 # host queue entry statuses handled directly by AgentTasks (Verifying is
429 # handled through SpecialTasks, so is not listed here)
430 statuses = (models.HostQueueEntry.Status.STARTING,
431 models.HostQueueEntry.Status.RUNNING,
432 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000433 models.HostQueueEntry.Status.PARSING,
434 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000435 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000436 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000437 where='status IN (%s)' % status_list)
438
439 agent_tasks = []
440 used_queue_entries = set()
441 for entry in queue_entries:
442 if self.get_agents_for_entry(entry):
443 # already being handled
444 continue
445 if entry in used_queue_entries:
446 # already picked up by a synchronous job
447 continue
448 agent_task = self._get_agent_task_for_queue_entry(entry)
449 agent_tasks.append(agent_task)
450 used_queue_entries.update(agent_task.queue_entries)
451 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000452
453
showardd1195652009-12-08 22:21:02 +0000454 def _get_special_task_agent_tasks(self, is_active=False):
455 special_tasks = models.SpecialTask.objects.filter(
456 is_active=is_active, is_complete=False)
457 return [self._get_agent_task_for_special_task(task)
458 for task in special_tasks]
459
460
461 def _get_agent_task_for_queue_entry(self, queue_entry):
462 """
463 Construct an AgentTask instance for the given active HostQueueEntry,
464 if one can currently run it.
465 @param queue_entry: a HostQueueEntry
466 @returns an AgentTask to run the queue entry
467 """
468 task_entries = queue_entry.job.get_group_entries(queue_entry)
469 self._check_for_duplicate_host_entries(task_entries)
470
471 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
472 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000473 if queue_entry.is_hostless():
474 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000475 return QueueTask(queue_entries=task_entries)
476 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
477 return GatherLogsTask(queue_entries=task_entries)
478 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
479 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000480 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
481 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000482
Dale Curtisaa513362011-03-01 17:27:44 -0800483 raise host_scheduler.SchedulerError(
484 '_get_agent_task_for_queue_entry got entry with '
485 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000486
487
488 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000489 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
490 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000491 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000492 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000493 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000494 if using_host:
showardd1195652009-12-08 22:21:02 +0000495 self._assert_host_has_no_agent(task_entry)
496
497
498 def _assert_host_has_no_agent(self, entry):
499 """
500 @param entry: a HostQueueEntry or a SpecialTask
501 """
502 if self.host_has_agent(entry.host):
503 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800504 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000505 'While scheduling %s, host %s already has a host agent %s'
506 % (entry, entry.host, agent.task))
507
508
509 def _get_agent_task_for_special_task(self, special_task):
510 """
511 Construct an AgentTask class to run the given SpecialTask and add it
512 to this dispatcher.
513 @param special_task: a models.SpecialTask instance
514 @returns an AgentTask to run this SpecialTask
515 """
516 self._assert_host_has_no_agent(special_task)
517
518 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
519 for agent_task_class in special_agent_task_classes:
520 if agent_task_class.TASK_TYPE == special_task.task:
521 return agent_task_class(task=special_task)
522
Dale Curtisaa513362011-03-01 17:27:44 -0800523 raise host_scheduler.SchedulerError(
524 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000525
526
527 def _register_pidfiles(self, agent_tasks):
528 for agent_task in agent_tasks:
529 agent_task.register_necessary_pidfiles()
530
531
532 def _recover_tasks(self, agent_tasks):
533 orphans = _drone_manager.get_orphaned_autoserv_processes()
534
535 for agent_task in agent_tasks:
536 agent_task.recover()
537 if agent_task.monitor and agent_task.monitor.has_process():
538 orphans.discard(agent_task.monitor.get_process())
539 self.add_agent_task(agent_task)
540
541 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000542
543
showard8cc058f2009-09-08 16:26:33 +0000544 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000545 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
546 % status):
showard0db3d432009-10-12 20:29:15 +0000547 if entry.status == status and not self.get_agents_for_entry(entry):
548 # The status can change during iteration, e.g., if job.run()
549 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000550 yield entry
551
552
showard6878e8b2009-07-20 22:37:45 +0000553 def _check_for_remaining_orphan_processes(self, orphans):
554 if not orphans:
555 return
556 subject = 'Unrecovered orphan autoserv processes remain'
557 message = '\n'.join(str(process) for process in orphans)
558 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000559
560 die_on_orphans = global_config.global_config.get_config_value(
561 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
562
563 if die_on_orphans:
564 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000565
showard170873e2009-01-07 00:22:26 +0000566
showard8cc058f2009-09-08 16:26:33 +0000567 def _recover_pending_entries(self):
568 for entry in self._get_unassigned_entries(
569 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000570 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000571 entry.on_pending()
572
573
showardb8900452009-10-12 20:31:01 +0000574 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000575 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000576 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
577 unrecovered_hqes = []
578 for queue_entry in queue_entries:
579 special_tasks = models.SpecialTask.objects.filter(
580 task__in=(models.SpecialTask.Task.CLEANUP,
581 models.SpecialTask.Task.VERIFY),
582 queue_entry__id=queue_entry.id,
583 is_complete=False)
584 if special_tasks.count() == 0:
585 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000586
showardb8900452009-10-12 20:31:01 +0000587 if unrecovered_hqes:
588 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800589 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000590 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000591 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000592
593
showard65db3932009-10-28 19:54:35 +0000594 def _get_prioritized_special_tasks(self):
595 """
596 Returns all queued SpecialTasks prioritized for repair first, then
597 cleanup, then verify.
598 """
599 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
600 is_complete=False,
601 host__locked=False)
602 # exclude hosts with active queue entries unless the SpecialTask is for
603 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000604 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000605 queued_tasks, 'afe_host_queue_entries', 'host_id',
606 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000607 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000608 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000609 where=['(afe_host_queue_entries.id IS NULL OR '
610 'afe_host_queue_entries.id = '
611 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000612
showard65db3932009-10-28 19:54:35 +0000613 # reorder tasks by priority
614 task_priority_order = [models.SpecialTask.Task.REPAIR,
615 models.SpecialTask.Task.CLEANUP,
616 models.SpecialTask.Task.VERIFY]
617 def task_priority_key(task):
618 return task_priority_order.index(task.task)
619 return sorted(queued_tasks, key=task_priority_key)
620
621
showard65db3932009-10-28 19:54:35 +0000622 def _schedule_special_tasks(self):
623 """
624 Execute queued SpecialTasks that are ready to run on idle hosts.
625 """
626 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000627 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000628 continue
showardd1195652009-12-08 22:21:02 +0000629 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000630
631
showard170873e2009-01-07 00:22:26 +0000632 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000633 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000634 # should never happen
showarded2afea2009-07-07 20:54:07 +0000635 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000636 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000637 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000638 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000639 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000640
641
jadmanski0afbb632008-06-06 21:10:57 +0000642 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000643 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700644 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000645 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000646 if self.host_has_agent(host):
647 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000648 continue
showard8cc058f2009-09-08 16:26:33 +0000649 if self._host_has_scheduled_special_task(host):
650 # host will have a special task scheduled on the next cycle
651 continue
showard170873e2009-01-07 00:22:26 +0000652 if print_message:
showardb18134f2009-03-20 20:52:18 +0000653 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000654 models.SpecialTask.objects.create(
655 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000656 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000657
658
jadmanski0afbb632008-06-06 21:10:57 +0000659 def _recover_hosts(self):
660 # recover "Repair Failed" hosts
661 message = 'Reverifying dead host %s'
662 self._reverify_hosts_where("status = 'Repair Failed'",
663 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000664
665
showard04c82c52008-05-29 19:38:12 +0000666
showardb95b1bd2008-08-15 18:11:04 +0000667 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000668 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +0000669 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000670 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000671 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +0000672 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000673
674
showard89f84db2009-03-12 20:39:13 +0000675 def _refresh_pending_queue_entries(self):
676 """
677 Lookup the pending HostQueueEntries and call our HostScheduler
678 refresh() method given that list. Return the list.
679
680 @returns A list of pending HostQueueEntries sorted in priority order.
681 """
showard63a34772008-08-18 19:32:50 +0000682 queue_entries = self._get_pending_queue_entries()
683 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000684 return []
showardb95b1bd2008-08-15 18:11:04 +0000685
showard63a34772008-08-18 19:32:50 +0000686 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000687
showard89f84db2009-03-12 20:39:13 +0000688 return queue_entries
689
690
691 def _schedule_atomic_group(self, queue_entry):
692 """
693 Schedule the given queue_entry on an atomic group of hosts.
694
695 Returns immediately if there are insufficient available hosts.
696
697 Creates new HostQueueEntries based off of queue_entry for the
698 scheduled hosts and starts them all running.
699 """
700 # This is a virtual host queue entry representing an entire
701 # atomic group, find a group and schedule their hosts.
702 group_hosts = self._host_scheduler.find_eligible_atomic_group(
703 queue_entry)
704 if not group_hosts:
705 return
showardcbe6f942009-06-17 19:33:49 +0000706
707 logging.info('Expanding atomic group entry %s with hosts %s',
708 queue_entry,
709 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000710
showard89f84db2009-03-12 20:39:13 +0000711 for assigned_host in group_hosts[1:]:
712 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000713 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000714 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000715 new_hqe.set_host(assigned_host)
716 self._run_queue_entry(new_hqe)
717
718 # The first assigned host uses the original HostQueueEntry
719 queue_entry.set_host(group_hosts[0])
720 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000721
722
showarda9545c02009-12-18 22:44:26 +0000723 def _schedule_hostless_job(self, queue_entry):
724 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000725 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000726
727
showard89f84db2009-03-12 20:39:13 +0000728 def _schedule_new_jobs(self):
729 queue_entries = self._refresh_pending_queue_entries()
730 if not queue_entries:
731 return
732
Simran Basi3f6717d2012-09-13 15:21:22 -0700733 logging.debug('Processing %d queue_entries', len(queue_entries))
showard63a34772008-08-18 19:32:50 +0000734 for queue_entry in queue_entries:
Simran Basidef92872012-09-20 13:34:34 -0700735 self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
showarde55955f2009-10-07 20:48:58 +0000736 is_unassigned_atomic_group = (
737 queue_entry.atomic_group_id is not None
738 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +0000739
740 if queue_entry.is_hostless():
Simran Basidef92872012-09-20 13:34:34 -0700741 self._log_extra_msg('Scheduling hostless job.')
showarda9545c02009-12-18 22:44:26 +0000742 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000743 elif is_unassigned_atomic_group:
744 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +0000745 else:
jamesren883492a2010-02-12 00:45:18 +0000746 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +0000747 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +0000748 assert assigned_host.id == queue_entry.host_id
749 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000750
751
showard8cc058f2009-09-08 16:26:33 +0000752 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +0000753 for agent_task in self._get_queue_entry_agent_tasks():
754 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000755
756
757 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000758 for entry in scheduler_models.HostQueueEntry.fetch(
759 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000760 task = entry.job.schedule_delayed_callback_task(entry)
761 if task:
showardd1195652009-12-08 22:21:02 +0000762 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000763
764
jamesren883492a2010-02-12 00:45:18 +0000765 def _run_queue_entry(self, queue_entry):
Simran Basidef92872012-09-20 13:34:34 -0700766 self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
767 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000768 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000769
770
jadmanski0afbb632008-06-06 21:10:57 +0000771 def _find_aborting(self):
jamesrene7c65cb2010-06-08 20:38:10 +0000772 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000773 for entry in scheduler_models.HostQueueEntry.fetch(
774 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +0000775 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +0000776 for agent in self.get_agents_for_entry(entry):
777 agent.abort()
778 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000779 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700780 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000781 for job in jobs_to_stop:
782 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000783
784
showard324bf812009-01-20 23:23:38 +0000785 def _can_start_agent(self, agent, num_started_this_cycle,
786 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000787 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000788 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000789 return True
790 # don't allow any nonzero-process agents to run after we've reached a
791 # limit (this avoids starvation of many-process agents)
792 if have_reached_limit:
793 return False
794 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000795 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000796 agent.task.owner_username,
797 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000798 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000799 return False
800 # if a single agent exceeds the per-cycle throttling, still allow it to
801 # run when it's the first agent in the cycle
802 if num_started_this_cycle == 0:
803 return True
804 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000805 if (num_started_this_cycle + agent.task.num_processes >
806 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000807 return False
808 return True
809
810
jadmanski0afbb632008-06-06 21:10:57 +0000811 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000812 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000813 have_reached_limit = False
814 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700815 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000816 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700817 self._log_extra_msg('Processing Agent with Host Ids: %s and '
818 'queue_entry ids:%s' % (agent.host_ids,
819 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000820 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000821 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000822 have_reached_limit):
823 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700824 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000825 continue
showardd1195652009-12-08 22:21:02 +0000826 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700827 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000828 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700829 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000830 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700831 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000832 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700833 logging.info('%d running processes. %d added this cycle.',
834 _drone_manager.total_running_processes(),
835 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +0000836
837
showard29f7cd22009-04-29 21:16:24 +0000838 def _process_recurring_runs(self):
839 recurring_runs = models.RecurringRun.objects.filter(
840 start_date__lte=datetime.datetime.now())
841 for rrun in recurring_runs:
842 # Create job from template
843 job = rrun.job
844 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000845 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000846
847 host_objects = info['hosts']
848 one_time_hosts = info['one_time_hosts']
849 metahost_objects = info['meta_hosts']
850 dependencies = info['dependencies']
851 atomic_group = info['atomic_group']
852
853 for host in one_time_hosts or []:
854 this_host = models.Host.create_one_time_host(host.hostname)
855 host_objects.append(this_host)
856
857 try:
858 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000859 options=options,
showard29f7cd22009-04-29 21:16:24 +0000860 host_objects=host_objects,
861 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +0000862 atomic_group=atomic_group)
863
864 except Exception, ex:
865 logging.exception(ex)
866 #TODO send email
867
868 if rrun.loop_count == 1:
869 rrun.delete()
870 else:
871 if rrun.loop_count != 0: # if not infinite loop
872 # calculate new start_date
873 difference = datetime.timedelta(seconds=rrun.loop_period)
874 rrun.start_date = rrun.start_date + difference
875 rrun.loop_count -= 1
876 rrun.save()
877
878
Simran Basia858a232012-08-21 11:04:37 -0700879SiteDispatcher = utils.import_site_class(
880 __file__, 'autotest_lib.scheduler.site_monitor_db',
881 'SiteDispatcher', BaseDispatcher)
882
883class Dispatcher(SiteDispatcher):
884 pass
885
886
showard170873e2009-01-07 00:22:26 +0000887class PidfileRunMonitor(object):
888 """
889 Client must call either run() to start a new process or
890 attach_to_existing_process().
891 """
mbligh36768f02008-02-22 18:28:33 +0000892
showard170873e2009-01-07 00:22:26 +0000893 class _PidfileException(Exception):
894 """
895 Raised when there's some unexpected behavior with the pid file, but only
896 used internally (never allowed to escape this class).
897 """
mbligh36768f02008-02-22 18:28:33 +0000898
899
showard170873e2009-01-07 00:22:26 +0000900 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000901 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000902 self._start_time = None
903 self.pidfile_id = None
904 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000905
906
showard170873e2009-01-07 00:22:26 +0000907 def _add_nice_command(self, command, nice_level):
908 if not nice_level:
909 return command
910 return ['nice', '-n', str(nice_level)] + command
911
912
913 def _set_start_time(self):
914 self._start_time = time.time()
915
916
showard418785b2009-11-23 20:19:59 +0000917 def run(self, command, working_directory, num_processes, nice_level=None,
918 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +0000919 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +0000920 assert command is not None
921 if nice_level is not None:
922 command = ['nice', '-n', str(nice_level)] + command
923 self._set_start_time()
924 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +0000925 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +0000926 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +0000927 paired_with_pidfile=paired_with_pidfile, username=username,
928 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +0000929
930
showarded2afea2009-07-07 20:54:07 +0000931 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +0000932 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +0000933 num_processes=None):
showard170873e2009-01-07 00:22:26 +0000934 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +0000935 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000936 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +0000937 if num_processes is not None:
938 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +0000939
940
jadmanski0afbb632008-06-06 21:10:57 +0000941 def kill(self):
showard170873e2009-01-07 00:22:26 +0000942 if self.has_process():
943 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000944
mbligh36768f02008-02-22 18:28:33 +0000945
showard170873e2009-01-07 00:22:26 +0000946 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000947 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000948 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000949
950
showard170873e2009-01-07 00:22:26 +0000951 def get_process(self):
showard21baa452008-10-21 00:08:39 +0000952 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +0000953 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +0000954 return self._state.process
mblighbb421852008-03-11 22:36:16 +0000955
956
showard170873e2009-01-07 00:22:26 +0000957 def _read_pidfile(self, use_second_read=False):
958 assert self.pidfile_id is not None, (
959 'You must call run() or attach_to_existing_process()')
960 contents = _drone_manager.get_pidfile_contents(
961 self.pidfile_id, use_second_read=use_second_read)
962 if contents.is_invalid():
963 self._state = drone_manager.PidfileContents()
964 raise self._PidfileException(contents)
965 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +0000966
967
showard21baa452008-10-21 00:08:39 +0000968 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +0000969 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
970 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +0000971 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +0000972 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +0000973
974
975 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +0000976 if self.lost_process:
showard21baa452008-10-21 00:08:39 +0000977 return
mblighbb421852008-03-11 22:36:16 +0000978
showard21baa452008-10-21 00:08:39 +0000979 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +0000980
showard170873e2009-01-07 00:22:26 +0000981 if self._state.process is None:
982 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +0000983 return
mbligh90a549d2008-03-25 23:52:34 +0000984
showard21baa452008-10-21 00:08:39 +0000985 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000986 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +0000987 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +0000988 return
mbligh90a549d2008-03-25 23:52:34 +0000989
showard170873e2009-01-07 00:22:26 +0000990 # pid but no running process - maybe process *just* exited
991 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +0000992 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000993 # autoserv exited without writing an exit code
994 # to the pidfile
showard21baa452008-10-21 00:08:39 +0000995 self._handle_pidfile_error(
996 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +0000997
showard21baa452008-10-21 00:08:39 +0000998
999 def _get_pidfile_info(self):
1000 """\
1001 After completion, self._state will contain:
1002 pid=None, exit_status=None if autoserv has not yet run
1003 pid!=None, exit_status=None if autoserv is running
1004 pid!=None, exit_status!=None if autoserv has completed
1005 """
1006 try:
1007 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001008 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001009 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001010
1011
showard170873e2009-01-07 00:22:26 +00001012 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001013 """\
1014 Called when no pidfile is found or no pid is in the pidfile.
1015 """
showard170873e2009-01-07 00:22:26 +00001016 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001017 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001018 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001019 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001020 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001021
1022
showard35162b02009-03-03 02:17:30 +00001023 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001024 """\
1025 Called when autoserv has exited without writing an exit status,
1026 or we've timed out waiting for autoserv to write a pid to the
1027 pidfile. In either case, we just return failure and the caller
1028 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001029
showard170873e2009-01-07 00:22:26 +00001030 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001031 """
1032 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001033 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001034 self._state.exit_status = 1
1035 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001036
1037
jadmanski0afbb632008-06-06 21:10:57 +00001038 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001039 self._get_pidfile_info()
1040 return self._state.exit_status
1041
1042
1043 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001044 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001045 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001046 if self._state.num_tests_failed is None:
1047 return -1
showard21baa452008-10-21 00:08:39 +00001048 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001049
1050
showardcdaeae82009-08-31 18:32:48 +00001051 def try_copy_results_on_drone(self, **kwargs):
1052 if self.has_process():
1053 # copy results logs into the normal place for job results
1054 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1055
1056
1057 def try_copy_to_results_repository(self, source, **kwargs):
1058 if self.has_process():
1059 _drone_manager.copy_to_results_repository(self.get_process(),
1060 source, **kwargs)
1061
1062
mbligh36768f02008-02-22 18:28:33 +00001063class Agent(object):
showard77182562009-06-10 00:16:05 +00001064 """
showard8cc058f2009-09-08 16:26:33 +00001065 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001066
1067 The following methods are required on all task objects:
1068 poll() - Called periodically to let the task check its status and
1069 update its internal state. If the task succeeded.
1070 is_done() - Returns True if the task is finished.
1071 abort() - Called when an abort has been requested. The task must
1072 set its aborted attribute to True if it actually aborted.
1073
1074 The following attributes are required on all task objects:
1075 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001076 success - bool, True if this task succeeded.
1077 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1078 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001079 """
1080
1081
showard418785b2009-11-23 20:19:59 +00001082 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001083 """
showard8cc058f2009-09-08 16:26:33 +00001084 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001085 """
showard8cc058f2009-09-08 16:26:33 +00001086 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001087
showard77182562009-06-10 00:16:05 +00001088 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001089 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001090
showard8cc058f2009-09-08 16:26:33 +00001091 self.queue_entry_ids = task.queue_entry_ids
1092 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001093
showard8cc058f2009-09-08 16:26:33 +00001094 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001095 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001096
1097
jadmanski0afbb632008-06-06 21:10:57 +00001098 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001099 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001100 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001101 self.task.poll()
1102 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001103 self.finished = True
showardec113162008-05-08 00:52:49 +00001104
1105
jadmanski0afbb632008-06-06 21:10:57 +00001106 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001107 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001108
1109
showardd3dc1992009-04-22 21:01:40 +00001110 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001111 if self.task:
1112 self.task.abort()
1113 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001114 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001115 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001116
showardd3dc1992009-04-22 21:01:40 +00001117
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001118class BaseAgentTask(object):
showardd1195652009-12-08 22:21:02 +00001119 class _NullMonitor(object):
1120 pidfile_id = None
1121
1122 def has_process(self):
1123 return True
1124
1125
1126 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001127 """
showardd1195652009-12-08 22:21:02 +00001128 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001129 """
jadmanski0afbb632008-06-06 21:10:57 +00001130 self.done = False
showardd1195652009-12-08 22:21:02 +00001131 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001132 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001133 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001134 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001135 self.queue_entry_ids = []
1136 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001137 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001138
1139
1140 def _set_ids(self, host=None, queue_entries=None):
1141 if queue_entries and queue_entries != [None]:
1142 self.host_ids = [entry.host.id for entry in queue_entries]
1143 self.queue_entry_ids = [entry.id for entry in queue_entries]
1144 else:
1145 assert host
1146 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001147
1148
jadmanski0afbb632008-06-06 21:10:57 +00001149 def poll(self):
showard08a36412009-05-05 01:01:13 +00001150 if not self.started:
1151 self.start()
showardd1195652009-12-08 22:21:02 +00001152 if not self.done:
1153 self.tick()
showard08a36412009-05-05 01:01:13 +00001154
1155
1156 def tick(self):
showardd1195652009-12-08 22:21:02 +00001157 assert self.monitor
1158 exit_code = self.monitor.exit_code()
1159 if exit_code is None:
1160 return
mbligh36768f02008-02-22 18:28:33 +00001161
showardd1195652009-12-08 22:21:02 +00001162 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001163 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001164
1165
jadmanski0afbb632008-06-06 21:10:57 +00001166 def is_done(self):
1167 return self.done
mbligh36768f02008-02-22 18:28:33 +00001168
1169
jadmanski0afbb632008-06-06 21:10:57 +00001170 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001171 if self.done:
showardd1195652009-12-08 22:21:02 +00001172 assert self.started
showard08a36412009-05-05 01:01:13 +00001173 return
showardd1195652009-12-08 22:21:02 +00001174 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001175 self.done = True
1176 self.success = success
1177 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001178
1179
jadmanski0afbb632008-06-06 21:10:57 +00001180 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001181 """
1182 To be overridden.
1183 """
showarded2afea2009-07-07 20:54:07 +00001184 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001185 self.register_necessary_pidfiles()
1186
1187
1188 def _log_file(self):
1189 if not self._log_file_name:
1190 return None
1191 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001192
mbligh36768f02008-02-22 18:28:33 +00001193
jadmanski0afbb632008-06-06 21:10:57 +00001194 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001195 log_file = self._log_file()
1196 if self.monitor and log_file:
1197 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001198
1199
jadmanski0afbb632008-06-06 21:10:57 +00001200 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001201 """
1202 To be overridden.
1203 """
jadmanski0afbb632008-06-06 21:10:57 +00001204 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001205 logging.info("%s finished with success=%s", type(self).__name__,
1206 self.success)
1207
mbligh36768f02008-02-22 18:28:33 +00001208
1209
jadmanski0afbb632008-06-06 21:10:57 +00001210 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001211 if not self.started:
1212 self.prolog()
1213 self.run()
1214
1215 self.started = True
1216
1217
1218 def abort(self):
1219 if self.monitor:
1220 self.monitor.kill()
1221 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001222 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001223 self.cleanup()
1224
1225
showarded2afea2009-07-07 20:54:07 +00001226 def _get_consistent_execution_path(self, execution_entries):
1227 first_execution_path = execution_entries[0].execution_path()
1228 for execution_entry in execution_entries[1:]:
1229 assert execution_entry.execution_path() == first_execution_path, (
1230 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1231 execution_entry,
1232 first_execution_path,
1233 execution_entries[0]))
1234 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001235
1236
showarded2afea2009-07-07 20:54:07 +00001237 def _copy_results(self, execution_entries, use_monitor=None):
1238 """
1239 @param execution_entries: list of objects with execution_path() method
1240 """
showard6d1c1432009-08-20 23:30:39 +00001241 if use_monitor is not None and not use_monitor.has_process():
1242 return
1243
showarded2afea2009-07-07 20:54:07 +00001244 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001245 if use_monitor is None:
1246 assert self.monitor
1247 use_monitor = self.monitor
1248 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001249 execution_path = self._get_consistent_execution_path(execution_entries)
1250 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001251 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001252
showarda1e74b32009-05-12 17:32:04 +00001253
1254 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001255 for queue_entry in queue_entries:
1256 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001257
1258
mbligh4608b002010-01-05 18:22:35 +00001259 def _archive_results(self, queue_entries):
1260 for queue_entry in queue_entries:
1261 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001262
1263
showardd1195652009-12-08 22:21:02 +00001264 def _command_line(self):
1265 """
1266 Return the command line to run. Must be overridden.
1267 """
1268 raise NotImplementedError
1269
1270
1271 @property
1272 def num_processes(self):
1273 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001274 Return the number of processes forked by this BaseAgentTask's process.
1275 It may only be approximate. To be overridden if necessary.
showardd1195652009-12-08 22:21:02 +00001276 """
1277 return 1
1278
1279
1280 def _paired_with_monitor(self):
1281 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001282 If this BaseAgentTask's process must run on the same machine as some
showardd1195652009-12-08 22:21:02 +00001283 previous process, this method should be overridden to return a
1284 PidfileRunMonitor for that process.
1285 """
1286 return self._NullMonitor()
1287
1288
1289 @property
1290 def owner_username(self):
1291 """
1292 Return login of user responsible for this task. May be None. Must be
1293 overridden.
1294 """
1295 raise NotImplementedError
1296
1297
1298 def _working_directory(self):
1299 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001300 Return the directory where this BaseAgentTask's process executes.
1301 Must be overridden.
showardd1195652009-12-08 22:21:02 +00001302 """
1303 raise NotImplementedError
1304
1305
1306 def _pidfile_name(self):
1307 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001308 Return the name of the pidfile this BaseAgentTask's process uses. To be
showardd1195652009-12-08 22:21:02 +00001309 overridden if necessary.
1310 """
jamesrenc44ae992010-02-19 00:12:54 +00001311 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001312
1313
1314 def _check_paired_results_exist(self):
1315 if not self._paired_with_monitor().has_process():
1316 email_manager.manager.enqueue_notify_email(
1317 'No paired results in task',
1318 'No paired results in task %s at %s'
1319 % (self, self._paired_with_monitor().pidfile_id))
1320 self.finished(False)
1321 return False
1322 return True
1323
1324
1325 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001326 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001327 self.monitor = PidfileRunMonitor()
1328
1329
1330 def run(self):
1331 if not self._check_paired_results_exist():
1332 return
1333
1334 self._create_monitor()
1335 self.monitor.run(
1336 self._command_line(), self._working_directory(),
1337 num_processes=self.num_processes,
1338 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1339 pidfile_name=self._pidfile_name(),
1340 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001341 username=self.owner_username,
1342 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1343
1344
1345 def get_drone_hostnames_allowed(self):
1346 if not models.DroneSet.drone_sets_enabled():
1347 return None
1348
1349 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1350 if not hqes:
1351 # Only special tasks could be missing host queue entries
1352 assert isinstance(self, SpecialAgentTask)
1353 return self._user_or_global_default_drone_set(
1354 self.task, self.task.requested_by)
1355
1356 job_ids = hqes.values_list('job', flat=True).distinct()
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001357 assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
jamesren76fcf192010-04-21 20:39:50 +00001358 "span multiple jobs")
1359
1360 job = models.Job.objects.get(id=job_ids[0])
1361 drone_set = job.drone_set
1362 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001363 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001364
1365 return drone_set.get_drone_hostnames()
1366
1367
1368 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1369 """
1370 Returns the user's default drone set, if present.
1371
1372 Otherwise, returns the global default drone set.
1373 """
1374 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1375 if not user:
1376 logging.warn('%s had no owner; using default drone set',
1377 obj_with_owner)
1378 return default_hostnames
1379 if not user.drone_set:
1380 logging.warn('User %s has no default drone set, using global '
1381 'default', user.login)
1382 return default_hostnames
1383 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001384
1385
1386 def register_necessary_pidfiles(self):
1387 pidfile_id = _drone_manager.get_pidfile_id_from(
1388 self._working_directory(), self._pidfile_name())
1389 _drone_manager.register_pidfile(pidfile_id)
1390
1391 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1392 if paired_pidfile_id:
1393 _drone_manager.register_pidfile(paired_pidfile_id)
1394
1395
1396 def recover(self):
1397 if not self._check_paired_results_exist():
1398 return
1399
1400 self._create_monitor()
1401 self.monitor.attach_to_existing_process(
1402 self._working_directory(), pidfile_name=self._pidfile_name(),
1403 num_processes=self.num_processes)
1404 if not self.monitor.has_process():
1405 # no process to recover; wait to be started normally
1406 self.monitor = None
1407 return
1408
1409 self.started = True
1410 logging.info('Recovering process %s for %s at %s'
1411 % (self.monitor.get_process(), type(self).__name__,
1412 self._working_directory()))
mbligh36768f02008-02-22 18:28:33 +00001413
1414
mbligh4608b002010-01-05 18:22:35 +00001415 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1416 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001417 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001418 for entry in queue_entries:
1419 if entry.status not in allowed_hqe_statuses:
Dale Curtisaa513362011-03-01 17:27:44 -08001420 raise host_scheduler.SchedulerError(
1421 '%s attempting to start entry with invalid status %s: '
1422 '%s' % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001423 invalid_host_status = (
1424 allowed_host_statuses is not None
1425 and entry.host.status not in allowed_host_statuses)
1426 if invalid_host_status:
Dale Curtisaa513362011-03-01 17:27:44 -08001427 raise host_scheduler.SchedulerError(
1428 '%s attempting to start on queue entry with invalid '
1429 'host status %s: %s'
1430 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001431
1432
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001433SiteAgentTask = utils.import_site_class(
1434 __file__, 'autotest_lib.scheduler.site_monitor_db',
1435 'SiteAgentTask', BaseAgentTask)
1436
1437class AgentTask(SiteAgentTask):
1438 pass
1439
1440
showardd9205182009-04-27 20:09:55 +00001441class TaskWithJobKeyvals(object):
1442 """AgentTask mixin providing functionality to help with job keyval files."""
1443 _KEYVAL_FILE = 'keyval'
1444 def _format_keyval(self, key, value):
1445 return '%s=%s' % (key, value)
1446
1447
1448 def _keyval_path(self):
1449 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001450 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001451
1452
1453 def _write_keyval_after_job(self, field, value):
1454 assert self.monitor
1455 if not self.monitor.has_process():
1456 return
1457 _drone_manager.write_lines_to_file(
1458 self._keyval_path(), [self._format_keyval(field, value)],
1459 paired_with_process=self.monitor.get_process())
1460
1461
1462 def _job_queued_keyval(self, job):
1463 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1464
1465
1466 def _write_job_finished(self):
1467 self._write_keyval_after_job("job_finished", int(time.time()))
1468
1469
showarddb502762009-09-09 15:31:20 +00001470 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1471 keyval_contents = '\n'.join(self._format_keyval(key, value)
1472 for key, value in keyval_dict.iteritems())
1473 # always end with a newline to allow additional keyvals to be written
1474 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001475 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001476 keyval_contents,
1477 file_path=keyval_path)
1478
1479
1480 def _write_keyvals_before_job(self, keyval_dict):
1481 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1482
1483
1484 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001485 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001486 host.hostname)
1487 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001488 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001489 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1490 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1491
1492
showard8cc058f2009-09-08 16:26:33 +00001493class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001494 """
1495 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1496 """
1497
1498 TASK_TYPE = None
1499 host = None
1500 queue_entry = None
1501
showardd1195652009-12-08 22:21:02 +00001502 def __init__(self, task, extra_command_args):
1503 super(SpecialAgentTask, self).__init__()
1504
lmrb7c5d272010-04-16 06:34:04 +00001505 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001506
jamesrenc44ae992010-02-19 00:12:54 +00001507 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001508 self.queue_entry = None
1509 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001510 self.queue_entry = scheduler_models.HostQueueEntry(
1511 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001512
showarded2afea2009-07-07 20:54:07 +00001513 self.task = task
1514 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001515
1516
showard8cc058f2009-09-08 16:26:33 +00001517 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001518 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1519
1520
1521 def _command_line(self):
1522 return _autoserv_command_line(self.host.hostname,
1523 self._extra_command_args,
1524 queue_entry=self.queue_entry)
1525
1526
1527 def _working_directory(self):
1528 return self.task.execution_path()
1529
1530
1531 @property
1532 def owner_username(self):
1533 if self.task.requested_by:
1534 return self.task.requested_by.login
1535 return None
showard8cc058f2009-09-08 16:26:33 +00001536
1537
showarded2afea2009-07-07 20:54:07 +00001538 def prolog(self):
1539 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001540 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001541 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001542
1543
showardde634ee2009-01-30 01:44:24 +00001544 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001545 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001546
showard2fe3f1d2009-07-06 20:19:11 +00001547 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001548 return # don't fail metahost entries, they'll be reassigned
1549
showard2fe3f1d2009-07-06 20:19:11 +00001550 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001551 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001552 return # entry has been aborted
1553
showard2fe3f1d2009-07-06 20:19:11 +00001554 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001555 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001556 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001557 self._write_keyval_after_job(queued_key, queued_time)
1558 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001559
showard8cc058f2009-09-08 16:26:33 +00001560 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001561 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001562 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001563 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001564
showard8cc058f2009-09-08 16:26:33 +00001565 pidfile_id = _drone_manager.get_pidfile_id_from(
1566 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001567 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001568 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001569
1570 if self.queue_entry.job.parse_failed_repair:
1571 self._parse_results([self.queue_entry])
1572 else:
1573 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001574
1575
1576 def cleanup(self):
1577 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001578
1579 # We will consider an aborted task to be "Failed"
1580 self.task.finish(bool(self.success))
1581
showardf85a0b72009-10-07 20:48:45 +00001582 if self.monitor:
1583 if self.monitor.has_process():
1584 self._copy_results([self.task])
1585 if self.monitor.pidfile_id is not None:
1586 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001587
1588
1589class RepairTask(SpecialAgentTask):
1590 TASK_TYPE = models.SpecialTask.Task.REPAIR
1591
1592
showardd1195652009-12-08 22:21:02 +00001593 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001594 """\
1595 queue_entry: queue entry to mark failed if this repair fails.
1596 """
1597 protection = host_protections.Protection.get_string(
1598 task.host.protection)
1599 # normalize the protection name
1600 protection = host_protections.Protection.get_attr_name(protection)
1601
1602 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001603 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001604
1605 # *don't* include the queue entry in IDs -- if the queue entry is
1606 # aborted, we want to leave the repair task running
1607 self._set_ids(host=self.host)
1608
1609
1610 def prolog(self):
1611 super(RepairTask, self).prolog()
1612 logging.info("repair_task starting")
1613 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001614
1615
jadmanski0afbb632008-06-06 21:10:57 +00001616 def epilog(self):
1617 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001618
jadmanski0afbb632008-06-06 21:10:57 +00001619 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001620 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001621 else:
showard8cc058f2009-09-08 16:26:33 +00001622 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001623 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001624 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001625
1626
showarded2afea2009-07-07 20:54:07 +00001627class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001628 def _copy_to_results_repository(self):
1629 if not self.queue_entry or self.queue_entry.meta_host:
1630 return
1631
1632 self.queue_entry.set_execution_subdir()
1633 log_name = os.path.basename(self.task.execution_path())
1634 source = os.path.join(self.task.execution_path(), 'debug',
1635 'autoserv.DEBUG')
1636 destination = os.path.join(
1637 self.queue_entry.execution_path(), log_name)
1638
1639 self.monitor.try_copy_to_results_repository(
1640 source, destination_path=destination)
1641
1642
showard170873e2009-01-07 00:22:26 +00001643 def epilog(self):
1644 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001645
showard775300b2009-09-09 15:30:50 +00001646 if self.success:
1647 return
showard8fe93b52008-11-18 17:53:22 +00001648
showard775300b2009-09-09 15:30:50 +00001649 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001650
showard775300b2009-09-09 15:30:50 +00001651 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001652 # effectively ignore failure for these hosts
1653 self.success = True
showard775300b2009-09-09 15:30:50 +00001654 return
1655
1656 if self.queue_entry:
1657 self.queue_entry.requeue()
1658
1659 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001660 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001661 queue_entry__id=self.queue_entry.id):
1662 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1663 self._fail_queue_entry()
1664 return
1665
showard9bb960b2009-11-19 01:02:11 +00001666 queue_entry = models.HostQueueEntry.objects.get(
1667 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001668 else:
1669 queue_entry = None
1670
1671 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001672 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001673 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001674 queue_entry=queue_entry,
1675 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001676
showard8fe93b52008-11-18 17:53:22 +00001677
1678class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001679 TASK_TYPE = models.SpecialTask.Task.VERIFY
1680
1681
showardd1195652009-12-08 22:21:02 +00001682 def __init__(self, task):
1683 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001684 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001685
1686
jadmanski0afbb632008-06-06 21:10:57 +00001687 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001688 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001689
showardb18134f2009-03-20 20:52:18 +00001690 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001691 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001692 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1693 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001694
jamesren42318f72010-05-10 23:40:59 +00001695 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00001696 # and there's no need to keep records of other requests.
1697 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001698 host__id=self.host.id,
1699 task=models.SpecialTask.Task.VERIFY,
jamesren42318f72010-05-10 23:40:59 +00001700 is_active=False, is_complete=False, queue_entry=None)
showarded2afea2009-07-07 20:54:07 +00001701 queued_verifies = queued_verifies.exclude(id=self.task.id)
1702 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001703
mbligh36768f02008-02-22 18:28:33 +00001704
jadmanski0afbb632008-06-06 21:10:57 +00001705 def epilog(self):
1706 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001707 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001708 if self.queue_entry:
1709 self.queue_entry.on_pending()
1710 else:
1711 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001712
1713
mbligh4608b002010-01-05 18:22:35 +00001714class CleanupTask(PreJobTask):
1715 # note this can also run post-job, but when it does, it's running standalone
1716 # against the host (not related to the job), so it's not considered a
1717 # PostJobTask
1718
1719 TASK_TYPE = models.SpecialTask.Task.CLEANUP
1720
1721
1722 def __init__(self, task, recover_run_monitor=None):
1723 super(CleanupTask, self).__init__(task, ['--cleanup'])
1724 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1725
1726
1727 def prolog(self):
1728 super(CleanupTask, self).prolog()
1729 logging.info("starting cleanup task for host: %s", self.host.hostname)
1730 self.host.set_status(models.Host.Status.CLEANING)
1731 if self.queue_entry:
1732 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1733
1734
1735 def _finish_epilog(self):
1736 if not self.queue_entry or not self.success:
1737 return
1738
1739 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
1740 should_run_verify = (
1741 self.queue_entry.job.run_verify
1742 and self.host.protection != do_not_verify_protection)
1743 if should_run_verify:
1744 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1745 models.SpecialTask.objects.create(
1746 host=models.Host.objects.get(id=self.host.id),
1747 queue_entry=entry,
1748 task=models.SpecialTask.Task.VERIFY)
1749 else:
1750 self.queue_entry.on_pending()
1751
1752
1753 def epilog(self):
1754 super(CleanupTask, self).epilog()
1755
1756 if self.success:
1757 self.host.update_field('dirty', 0)
1758 self.host.set_status(models.Host.Status.READY)
1759
1760 self._finish_epilog()
1761
1762
showarda9545c02009-12-18 22:44:26 +00001763class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
1764 """
1765 Common functionality for QueueTask and HostlessQueueTask
1766 """
1767 def __init__(self, queue_entries):
1768 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001769 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001770 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001771
1772
showard73ec0442009-02-07 02:05:20 +00001773 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001774 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001775
1776
jamesrenc44ae992010-02-19 00:12:54 +00001777 def _write_control_file(self, execution_path):
1778 control_path = _drone_manager.attach_file_to_execution(
1779 execution_path, self.job.control_file)
1780 return control_path
1781
1782
showardd1195652009-12-08 22:21:02 +00001783 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001784 execution_path = self.queue_entries[0].execution_path()
1785 control_path = self._write_control_file(execution_path)
1786 hostnames = ','.join(entry.host.hostname
1787 for entry in self.queue_entries
1788 if not entry.is_hostless())
1789
1790 execution_tag = self.queue_entries[0].execution_tag()
1791 params = _autoserv_command_line(
1792 hostnames,
1793 ['-P', execution_tag, '-n',
1794 _drone_manager.absolute_path(control_path)],
1795 job=self.job, verbose=False)
1796
1797 if not self.job.is_server_job():
1798 params.append('-c')
1799
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001800 if self.job.is_image_update_job():
1801 params += ['--image', self.job.update_image_path]
1802
jamesrenc44ae992010-02-19 00:12:54 +00001803 return params
showardd1195652009-12-08 22:21:02 +00001804
1805
1806 @property
1807 def num_processes(self):
1808 return len(self.queue_entries)
1809
1810
1811 @property
1812 def owner_username(self):
1813 return self.job.owner
1814
1815
1816 def _working_directory(self):
1817 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001818
1819
jadmanski0afbb632008-06-06 21:10:57 +00001820 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001821 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001822 keyval_dict = self.job.keyval_dict()
1823 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001824 group_name = self.queue_entries[0].get_group_name()
1825 if group_name:
1826 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001827 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001828 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001829 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001830 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001831
1832
showard35162b02009-03-03 02:17:30 +00001833 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001834 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001835 _drone_manager.write_lines_to_file(error_file_path,
1836 [_LOST_PROCESS_ERROR])
1837
1838
showardd3dc1992009-04-22 21:01:40 +00001839 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001840 if not self.monitor:
1841 return
1842
showardd9205182009-04-27 20:09:55 +00001843 self._write_job_finished()
1844
showard35162b02009-03-03 02:17:30 +00001845 if self.monitor.lost_process:
1846 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001847
jadmanskif7fa2cc2008-10-01 14:13:23 +00001848
showardcbd74612008-11-19 21:42:02 +00001849 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001850 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001851 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001852 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001853 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001854
1855
jadmanskif7fa2cc2008-10-01 14:13:23 +00001856 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001857 if not self.monitor or not self.monitor.has_process():
1858 return
1859
jadmanskif7fa2cc2008-10-01 14:13:23 +00001860 # build up sets of all the aborted_by and aborted_on values
1861 aborted_by, aborted_on = set(), set()
1862 for queue_entry in self.queue_entries:
1863 if queue_entry.aborted_by:
1864 aborted_by.add(queue_entry.aborted_by)
1865 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1866 aborted_on.add(t)
1867
1868 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001869 # TODO(showard): this conditional is now obsolete, we just need to leave
1870 # it in temporarily for backwards compatibility over upgrades. delete
1871 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001872 assert len(aborted_by) <= 1
1873 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001874 aborted_by_value = aborted_by.pop()
1875 aborted_on_value = max(aborted_on)
1876 else:
1877 aborted_by_value = 'autotest_system'
1878 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001879
showarda0382352009-02-11 23:36:43 +00001880 self._write_keyval_after_job("aborted_by", aborted_by_value)
1881 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001882
showardcbd74612008-11-19 21:42:02 +00001883 aborted_on_string = str(datetime.datetime.fromtimestamp(
1884 aborted_on_value))
1885 self._write_status_comment('Job aborted by %s on %s' %
1886 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001887
1888
jadmanski0afbb632008-06-06 21:10:57 +00001889 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001890 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001891 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001892 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001893
1894
jadmanski0afbb632008-06-06 21:10:57 +00001895 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001896 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001897 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001898
1899
1900class QueueTask(AbstractQueueTask):
1901 def __init__(self, queue_entries):
1902 super(QueueTask, self).__init__(queue_entries)
1903 self._set_ids(queue_entries=queue_entries)
1904
1905
1906 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001907 self._check_queue_entry_statuses(
1908 self.queue_entries,
1909 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1910 models.HostQueueEntry.Status.RUNNING),
1911 allowed_host_statuses=(models.Host.Status.PENDING,
1912 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001913
1914 super(QueueTask, self).prolog()
1915
1916 for queue_entry in self.queue_entries:
1917 self._write_host_keyvals(queue_entry.host)
1918 queue_entry.host.set_status(models.Host.Status.RUNNING)
1919 queue_entry.host.update_field('dirty', 1)
1920 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1921 # TODO(gps): Remove this if nothing needs it anymore.
1922 # A potential user is: tko/parser
1923 self.job.write_to_machines_file(self.queue_entries[0])
1924
1925
1926 def _finish_task(self):
1927 super(QueueTask, self)._finish_task()
1928
1929 for queue_entry in self.queue_entries:
1930 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001931 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001932
1933
mbligh4608b002010-01-05 18:22:35 +00001934class HostlessQueueTask(AbstractQueueTask):
1935 def __init__(self, queue_entry):
1936 super(HostlessQueueTask, self).__init__([queue_entry])
1937 self.queue_entry_ids = [queue_entry.id]
1938
1939
1940 def prolog(self):
1941 self.queue_entries[0].update_field('execution_subdir', 'hostless')
1942 super(HostlessQueueTask, self).prolog()
1943
1944
mbligh4608b002010-01-05 18:22:35 +00001945 def _finish_task(self):
1946 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00001947 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001948
1949
showardd3dc1992009-04-22 21:01:40 +00001950class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00001951 def __init__(self, queue_entries, log_file_name):
1952 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00001953
showardd1195652009-12-08 22:21:02 +00001954 self.queue_entries = queue_entries
1955
showardd3dc1992009-04-22 21:01:40 +00001956 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00001957 self._autoserv_monitor.attach_to_existing_process(
1958 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00001959
showardd1195652009-12-08 22:21:02 +00001960
1961 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00001962 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00001963 return 'true'
1964 return self._generate_command(
1965 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00001966
1967
1968 def _generate_command(self, results_dir):
1969 raise NotImplementedError('Subclasses must override this')
1970
1971
showardd1195652009-12-08 22:21:02 +00001972 @property
1973 def owner_username(self):
1974 return self.queue_entries[0].job.owner
1975
1976
1977 def _working_directory(self):
1978 return self._get_consistent_execution_path(self.queue_entries)
1979
1980
1981 def _paired_with_monitor(self):
1982 return self._autoserv_monitor
1983
1984
showardd3dc1992009-04-22 21:01:40 +00001985 def _job_was_aborted(self):
1986 was_aborted = None
showardd1195652009-12-08 22:21:02 +00001987 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00001988 queue_entry.update_from_database()
1989 if was_aborted is None: # first queue entry
1990 was_aborted = bool(queue_entry.aborted)
1991 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00001992 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
1993 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00001994 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00001995 'Inconsistent abort state',
1996 'Queue entries have inconsistent abort state:\n' +
1997 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00001998 # don't crash here, just assume true
1999 return True
2000 return was_aborted
2001
2002
showardd1195652009-12-08 22:21:02 +00002003 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002004 if self._job_was_aborted():
2005 return models.HostQueueEntry.Status.ABORTED
2006
2007 # we'll use a PidfileRunMonitor to read the autoserv exit status
2008 if self._autoserv_monitor.exit_code() == 0:
2009 return models.HostQueueEntry.Status.COMPLETED
2010 return models.HostQueueEntry.Status.FAILED
2011
2012
showardd3dc1992009-04-22 21:01:40 +00002013 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002014 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002015 queue_entry.set_status(status)
2016
2017
2018 def abort(self):
2019 # override AgentTask.abort() to avoid killing the process and ending
2020 # the task. post-job tasks continue when the job is aborted.
2021 pass
2022
2023
mbligh4608b002010-01-05 18:22:35 +00002024 def _pidfile_label(self):
2025 # '.autoserv_execute' -> 'autoserv'
2026 return self._pidfile_name()[1:-len('_execute')]
2027
2028
showard9bb960b2009-11-19 01:02:11 +00002029class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002030 """
2031 Task responsible for
2032 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2033 * copying logs to the results repository
2034 * spawning CleanupTasks for hosts, if necessary
2035 * spawning a FinalReparseTask for the job
2036 """
showardd1195652009-12-08 22:21:02 +00002037 def __init__(self, queue_entries, recover_run_monitor=None):
2038 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002039 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002040 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002041 self._set_ids(queue_entries=queue_entries)
2042
2043
2044 def _generate_command(self, results_dir):
2045 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002046 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002047 return [_autoserv_path , '-p',
2048 '--pidfile-label=%s' % self._pidfile_label(),
2049 '--use-existing-results', '--collect-crashinfo',
2050 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002051
2052
showardd1195652009-12-08 22:21:02 +00002053 @property
2054 def num_processes(self):
2055 return len(self.queue_entries)
2056
2057
2058 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002059 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002060
2061
showardd3dc1992009-04-22 21:01:40 +00002062 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002063 self._check_queue_entry_statuses(
2064 self.queue_entries,
2065 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2066 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002067
showardd3dc1992009-04-22 21:01:40 +00002068 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002069
2070
showardd3dc1992009-04-22 21:01:40 +00002071 def epilog(self):
2072 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002073 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002074 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002075
showard9bb960b2009-11-19 01:02:11 +00002076
2077 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002078 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002079 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002080 models.HostQueueEntry.Status.COMPLETED)
2081 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2082 else:
2083 final_success = False
2084 num_tests_failed = 0
2085
showard9bb960b2009-11-19 01:02:11 +00002086 reboot_after = self._job.reboot_after
2087 do_reboot = (
2088 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002089 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002090 or reboot_after == model_attributes.RebootAfter.ALWAYS
2091 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
showard9bb960b2009-11-19 01:02:11 +00002092 and final_success and num_tests_failed == 0))
2093
showardd1195652009-12-08 22:21:02 +00002094 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002095 if do_reboot:
2096 # don't pass the queue entry to the CleanupTask. if the cleanup
2097 # fails, the job doesn't care -- it's over.
2098 models.SpecialTask.objects.create(
2099 host=models.Host.objects.get(id=queue_entry.host.id),
2100 task=models.SpecialTask.Task.CLEANUP,
2101 requested_by=self._job.owner_model())
2102 else:
2103 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002104
2105
showard0bbfc212009-04-29 21:06:13 +00002106 def run(self):
showard597bfd32009-05-08 18:22:50 +00002107 autoserv_exit_code = self._autoserv_monitor.exit_code()
2108 # only run if Autoserv exited due to some signal. if we have no exit
2109 # code, assume something bad (and signal-like) happened.
2110 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002111 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002112 else:
2113 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002114
2115
mbligh4608b002010-01-05 18:22:35 +00002116class SelfThrottledPostJobTask(PostJobTask):
2117 """
2118 Special AgentTask subclass that maintains its own global process limit.
2119 """
2120 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002121
2122
mbligh4608b002010-01-05 18:22:35 +00002123 @classmethod
2124 def _increment_running_processes(cls):
2125 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002126
mblighd5c95802008-03-05 00:33:46 +00002127
mbligh4608b002010-01-05 18:22:35 +00002128 @classmethod
2129 def _decrement_running_processes(cls):
2130 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002131
2132
mbligh4608b002010-01-05 18:22:35 +00002133 @classmethod
2134 def _max_processes(cls):
2135 raise NotImplementedError
2136
2137
2138 @classmethod
2139 def _can_run_new_process(cls):
2140 return cls._num_running_processes < cls._max_processes()
2141
2142
2143 def _process_started(self):
2144 return bool(self.monitor)
2145
2146
2147 def tick(self):
2148 # override tick to keep trying to start until the process count goes
2149 # down and we can, at which point we revert to default behavior
2150 if self._process_started():
2151 super(SelfThrottledPostJobTask, self).tick()
2152 else:
2153 self._try_starting_process()
2154
2155
2156 def run(self):
2157 # override run() to not actually run unless we can
2158 self._try_starting_process()
2159
2160
2161 def _try_starting_process(self):
2162 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002163 return
2164
mbligh4608b002010-01-05 18:22:35 +00002165 # actually run the command
2166 super(SelfThrottledPostJobTask, self).run()
jamesren25663562010-04-27 18:00:55 +00002167 if self._process_started():
2168 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002169
mblighd5c95802008-03-05 00:33:46 +00002170
mbligh4608b002010-01-05 18:22:35 +00002171 def finished(self, success):
2172 super(SelfThrottledPostJobTask, self).finished(success)
2173 if self._process_started():
2174 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002175
showard21baa452008-10-21 00:08:39 +00002176
mbligh4608b002010-01-05 18:22:35 +00002177class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002178 def __init__(self, queue_entries):
2179 super(FinalReparseTask, self).__init__(queue_entries,
2180 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002181 # don't use _set_ids, since we don't want to set the host_ids
2182 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002183
2184
2185 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002186 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002187 results_dir]
2188
2189
2190 @property
2191 def num_processes(self):
2192 return 0 # don't include parser processes in accounting
2193
2194
2195 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002196 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002197
2198
showard97aed502008-11-04 02:01:24 +00002199 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002200 def _max_processes(cls):
2201 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002202
2203
2204 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002205 self._check_queue_entry_statuses(
2206 self.queue_entries,
2207 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002208
showard97aed502008-11-04 02:01:24 +00002209 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002210
2211
2212 def epilog(self):
2213 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002214 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002215
2216
mbligh4608b002010-01-05 18:22:35 +00002217class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002218 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2219
mbligh4608b002010-01-05 18:22:35 +00002220 def __init__(self, queue_entries):
2221 super(ArchiveResultsTask, self).__init__(queue_entries,
2222 log_file_name='.archiving.log')
2223 # don't use _set_ids, since we don't want to set the host_ids
2224 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002225
2226
mbligh4608b002010-01-05 18:22:35 +00002227 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002228 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002229
2230
mbligh4608b002010-01-05 18:22:35 +00002231 def _generate_command(self, results_dir):
2232 return [_autoserv_path , '-p',
2233 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002234 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002235 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2236 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002237
2238
mbligh4608b002010-01-05 18:22:35 +00002239 @classmethod
2240 def _max_processes(cls):
2241 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002242
2243
2244 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002245 self._check_queue_entry_statuses(
2246 self.queue_entries,
2247 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2248
2249 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002250
2251
mbligh4608b002010-01-05 18:22:35 +00002252 def epilog(self):
2253 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002254 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002255 failed_file = os.path.join(self._working_directory(),
2256 self._ARCHIVING_FAILED_FILE)
2257 paired_process = self._paired_with_monitor().get_process()
2258 _drone_manager.write_lines_to_file(
2259 failed_file, ['Archiving failed with exit code %s'
2260 % self.monitor.exit_code()],
2261 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002262 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002263
2264
mbligh36768f02008-02-22 18:28:33 +00002265if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002266 main()