blob: 39276d83b56f35506bdbf5e3ac1f1714535e5286 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
mbligh36768f02008-02-22 18:28:33 +00008
Aviv Keshet225bdfe2013-03-05 10:10:08 -08009import datetime, optparse, os, signal
10import sys, time, traceback, urllib
11import logging, gc
showard402934a2009-12-21 22:20:47 +000012
Alex Miller05d7b4c2013-03-04 07:49:38 -080013import common
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000015
16import django.db
17
showard136e6dc2009-06-10 19:38:49 +000018from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000019from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000020from autotest_lib.database import database_connection
jamesrendd855242010-03-02 22:23:44 +000021from autotest_lib.frontend.afe import model_attributes
Alex Miller05d7b4c2013-03-04 07:49:38 -080022from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000023from autotest_lib.scheduler import drone_manager, drones, email_manager
Dale Curtisaa513362011-03-01 17:27:44 -080024from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
Alex Miller05d7b4c2013-03-04 07:49:38 -080025from autotest_lib.scheduler import scheduler_logging_config
jamesrenc44ae992010-02-19 00:12:54 +000026from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080027from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070028from autotest_lib.server import autoserv_utils
Fang Deng1d6c2a02013-04-17 15:25:45 -070029from autotest_lib.site_utils.graphite import stats
Alex Miller05d7b4c2013-03-04 07:49:38 -080030
showard549afad2009-08-20 23:33:36 +000031BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
32PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000033
mbligh36768f02008-02-22 18:28:33 +000034RESULTS_DIR = '.'
35AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000036DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000037AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
38
39if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000040 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000041AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
42AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
43
44if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000045 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000046
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
Aviv Keshet308e7362013-05-21 14:43:16 -070056_autoserv_directory = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server')
57_autoserv_path = os.path.join(_autoserv_directory, 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000059_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000060
Eric Lie0493a42010-11-15 13:05:43 -080061def _parser_path_default(install_dir):
62 return os.path.join(install_dir, 'tko', 'parse')
63_parser_path_func = utils.import_site_function(
64 __file__, 'autotest_lib.scheduler.site_monitor_db',
65 'parser_path', _parser_path_default)
66_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
67
mbligh36768f02008-02-22 18:28:33 +000068
showardec6a3b92009-09-25 20:29:13 +000069def _get_pidfile_timeout_secs():
70 """@returns How long to wait for autoserv to write pidfile."""
71 pidfile_timeout_mins = global_config.global_config.get_config_value(
72 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
73 return pidfile_timeout_mins * 60
74
75
mbligh83c1e9e2009-05-01 23:10:41 +000076def _site_init_monitor_db_dummy():
77 return {}
78
79
jamesren76fcf192010-04-21 20:39:50 +000080def _verify_default_drone_set_exists():
81 if (models.DroneSet.drone_sets_enabled() and
82 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080083 raise host_scheduler.SchedulerError(
84 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000085
86
87def _sanity_check():
88 """Make sure the configs are consistent before starting the scheduler"""
89 _verify_default_drone_set_exists()
90
91
mbligh36768f02008-02-22 18:28:33 +000092def main():
showard27f33872009-04-07 18:20:53 +000093 try:
showard549afad2009-08-20 23:33:36 +000094 try:
95 main_without_exception_handling()
96 except SystemExit:
97 raise
98 except:
99 logging.exception('Exception escaping in monitor_db')
100 raise
101 finally:
102 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000103
104
105def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000106 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000107
showard136e6dc2009-06-10 19:38:49 +0000108 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000109 parser = optparse.OptionParser(usage)
110 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
111 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000112 parser.add_option('--test', help='Indicate that scheduler is under ' +
113 'test and should use dummy autoserv and no parsing',
114 action='store_true')
115 (options, args) = parser.parse_args()
116 if len(args) != 1:
117 parser.print_usage()
118 return
mbligh36768f02008-02-22 18:28:33 +0000119
showard5613c662009-06-08 23:30:33 +0000120 scheduler_enabled = global_config.global_config.get_config_value(
121 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
122
123 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800124 logging.error("Scheduler not enabled, set enable_scheduler to true in "
125 "the global_config's SCHEDULER section to enable it. "
126 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000127 sys.exit(1)
128
jadmanski0afbb632008-06-06 21:10:57 +0000129 global RESULTS_DIR
130 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000131
mbligh83c1e9e2009-05-01 23:10:41 +0000132 site_init = utils.import_site_function(__file__,
133 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
134 _site_init_monitor_db_dummy)
135 site_init()
136
showardcca334f2009-03-12 20:38:34 +0000137 # Change the cwd while running to avoid issues incase we were launched from
138 # somewhere odd (such as a random NFS home directory of the person running
139 # sudo to launch us as the appropriate user).
140 os.chdir(RESULTS_DIR)
141
jamesrenc7d387e2010-08-10 21:48:30 +0000142 # This is helpful for debugging why stuff a scheduler launches is
143 # misbehaving.
144 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000145
jadmanski0afbb632008-06-06 21:10:57 +0000146 if options.test:
147 global _autoserv_path
148 _autoserv_path = 'autoserv_dummy'
149 global _testing_mode
150 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000151
jamesrenc44ae992010-02-19 00:12:54 +0000152 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000153 server.start()
154
jadmanski0afbb632008-06-06 21:10:57 +0000155 try:
jamesrenc44ae992010-02-19 00:12:54 +0000156 initialize()
showardc5afc462009-01-13 00:09:39 +0000157 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000158 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000159
Eric Lia82dc352011-02-23 13:15:52 -0800160 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000161 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000162 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000163 except:
showard170873e2009-01-07 00:22:26 +0000164 email_manager.manager.log_stacktrace(
165 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000166
showard170873e2009-01-07 00:22:26 +0000167 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000168 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000169 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000170 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000171
172
showard136e6dc2009-06-10 19:38:49 +0000173def setup_logging():
174 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
175 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
176 logging_manager.configure_logging(
177 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
178 logfile_name=log_name)
179
180
mbligh36768f02008-02-22 18:28:33 +0000181def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000182 global _shutdown
183 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000184 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000185
186
jamesrenc44ae992010-02-19 00:12:54 +0000187def initialize():
showardb18134f2009-03-20 20:52:18 +0000188 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
189 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000190
showard8de37132009-08-31 18:33:08 +0000191 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000192 logging.critical("monitor_db already running, aborting!")
193 sys.exit(1)
194 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000195
showardb1e51872008-10-07 11:08:18 +0000196 if _testing_mode:
197 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000198 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000199
jadmanski0afbb632008-06-06 21:10:57 +0000200 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
201 global _db
showard170873e2009-01-07 00:22:26 +0000202 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000203 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000204
showardfa8629c2008-11-04 16:51:23 +0000205 # ensure Django connection is in autocommit
206 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000207 # bypass the readonly connection
208 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000209
showardb18134f2009-03-20 20:52:18 +0000210 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000211 signal.signal(signal.SIGINT, handle_sigint)
212
jamesrenc44ae992010-02-19 00:12:54 +0000213 initialize_globals()
214 scheduler_models.initialize()
215
showardd1ee1dd2009-01-07 21:33:08 +0000216 drones = global_config.global_config.get_config_value(
217 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
218 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000219 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000220 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000221 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
222
showardb18134f2009-03-20 20:52:18 +0000223 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000224
225
jamesrenc44ae992010-02-19 00:12:54 +0000226def initialize_globals():
227 global _drone_manager
228 _drone_manager = drone_manager.instance()
229
230
showarded2afea2009-07-07 20:54:07 +0000231def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
232 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000233 """
234 @returns The autoserv command line as a list of executable + parameters.
235
236 @param machines - string - A machine or comma separated list of machines
237 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000238 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700239 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
240 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000241 @param queue_entry - A HostQueueEntry object - If supplied and no Job
242 object was supplied, this will be used to lookup the Job object.
243 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700244 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
245 machines, results_directory=drone_manager.WORKING_DIRECTORY,
246 extra_args=extra_args, job=job, queue_entry=queue_entry,
247 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000248
249
Simran Basia858a232012-08-21 11:04:37 -0700250class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800251
252
jadmanski0afbb632008-06-06 21:10:57 +0000253 def __init__(self):
254 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000255 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800256 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000257 user_cleanup_time = scheduler_config.config.clean_interval
258 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
259 _db, user_cleanup_time)
260 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000261 self._host_agents = {}
262 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000263 self._tick_count = 0
264 self._last_garbage_stats_time = time.time()
265 self._seconds_between_garbage_stats = 60 * (
266 global_config.global_config.get_config_value(
267 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700268 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700269 self._tick_debug = global_config.global_config.get_config_value(
270 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
271 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700272 self._extra_debugging = global_config.global_config.get_config_value(
273 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
274 default=False)
mbligh36768f02008-02-22 18:28:33 +0000275
mbligh36768f02008-02-22 18:28:33 +0000276
showard915958d2009-04-22 21:00:58 +0000277 def initialize(self, recover_hosts=True):
278 self._periodic_cleanup.initialize()
279 self._24hr_upkeep.initialize()
280
jadmanski0afbb632008-06-06 21:10:57 +0000281 # always recover processes
282 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000283
jadmanski0afbb632008-06-06 21:10:57 +0000284 if recover_hosts:
285 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000286
jamesrenc44ae992010-02-19 00:12:54 +0000287 self._host_scheduler.recovery_on_startup()
288
mbligh36768f02008-02-22 18:28:33 +0000289
Simran Basi0ec94dd2012-08-28 09:50:10 -0700290 def _log_tick_msg(self, msg):
291 if self._tick_debug:
292 logging.debug(msg)
293
294
Simran Basidef92872012-09-20 13:34:34 -0700295 def _log_extra_msg(self, msg):
296 if self._extra_debugging:
297 logging.debug(msg)
298
299
jadmanski0afbb632008-06-06 21:10:57 +0000300 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700301 """
302 This is an altered version of tick() where we keep track of when each
303 major step begins so we can try to figure out where we are using most
304 of the tick time.
305 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700306 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700307 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000308 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700309 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000310 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700311 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000312 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700313 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000314 self._find_aborting()
Simran Basi3f6717d2012-09-13 15:21:22 -0700315 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000316 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700317 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000318 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700319 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000320 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700321 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000322 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700323 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000324 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700325 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000326 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700327 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000328 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700329 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000330 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700331 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700332 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700333 with timer.get_client('email_manager_send_queued_emails'):
334 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700335 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700336 with timer.get_client('django_db_reset_queries'):
337 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000338 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000339
showard97aed502008-11-04 02:01:24 +0000340
mblighf3294cc2009-04-08 21:17:38 +0000341 def _run_cleanup(self):
342 self._periodic_cleanup.run_cleanup_maybe()
343 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000344
mbligh36768f02008-02-22 18:28:33 +0000345
showardf13a9e22009-12-18 22:54:09 +0000346 def _garbage_collection(self):
347 threshold_time = time.time() - self._seconds_between_garbage_stats
348 if threshold_time < self._last_garbage_stats_time:
349 # Don't generate these reports very often.
350 return
351
352 self._last_garbage_stats_time = time.time()
353 # Force a full level 0 collection (because we can, it doesn't hurt
354 # at this interval).
355 gc.collect()
356 logging.info('Logging garbage collector stats on tick %d.',
357 self._tick_count)
358 gc_stats._log_garbage_collector_stats()
359
360
showard170873e2009-01-07 00:22:26 +0000361 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
362 for object_id in object_ids:
363 agent_dict.setdefault(object_id, set()).add(agent)
364
365
366 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
367 for object_id in object_ids:
368 assert object_id in agent_dict
369 agent_dict[object_id].remove(agent)
370
371
showardd1195652009-12-08 22:21:02 +0000372 def add_agent_task(self, agent_task):
373 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000374 self._agents.append(agent)
375 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000376 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
377 self._register_agent_for_ids(self._queue_entry_agents,
378 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000379
showard170873e2009-01-07 00:22:26 +0000380
381 def get_agents_for_entry(self, queue_entry):
382 """
383 Find agents corresponding to the specified queue_entry.
384 """
showardd3dc1992009-04-22 21:01:40 +0000385 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000386
387
388 def host_has_agent(self, host):
389 """
390 Determine if there is currently an Agent present using this host.
391 """
392 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000393
394
jadmanski0afbb632008-06-06 21:10:57 +0000395 def remove_agent(self, agent):
396 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000397 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
398 agent)
399 self._unregister_agent_for_ids(self._queue_entry_agents,
400 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000401
402
showard8cc058f2009-09-08 16:26:33 +0000403 def _host_has_scheduled_special_task(self, host):
404 return bool(models.SpecialTask.objects.filter(host__id=host.id,
405 is_active=False,
406 is_complete=False))
407
408
jadmanski0afbb632008-06-06 21:10:57 +0000409 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000410 agent_tasks = self._create_recovery_agent_tasks()
411 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000412 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000413 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000414 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000415 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000416 self._reverify_remaining_hosts()
417 # reinitialize drones after killing orphaned processes, since they can
418 # leave around files when they die
419 _drone_manager.execute_actions()
420 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000421
showard170873e2009-01-07 00:22:26 +0000422
showardd1195652009-12-08 22:21:02 +0000423 def _create_recovery_agent_tasks(self):
424 return (self._get_queue_entry_agent_tasks()
425 + self._get_special_task_agent_tasks(is_active=True))
426
427
428 def _get_queue_entry_agent_tasks(self):
429 # host queue entry statuses handled directly by AgentTasks (Verifying is
430 # handled through SpecialTasks, so is not listed here)
431 statuses = (models.HostQueueEntry.Status.STARTING,
432 models.HostQueueEntry.Status.RUNNING,
433 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000434 models.HostQueueEntry.Status.PARSING,
435 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000436 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000437 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000438 where='status IN (%s)' % status_list)
439
440 agent_tasks = []
441 used_queue_entries = set()
442 for entry in queue_entries:
443 if self.get_agents_for_entry(entry):
444 # already being handled
445 continue
446 if entry in used_queue_entries:
447 # already picked up by a synchronous job
448 continue
449 agent_task = self._get_agent_task_for_queue_entry(entry)
450 agent_tasks.append(agent_task)
451 used_queue_entries.update(agent_task.queue_entries)
452 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000453
454
showardd1195652009-12-08 22:21:02 +0000455 def _get_special_task_agent_tasks(self, is_active=False):
456 special_tasks = models.SpecialTask.objects.filter(
457 is_active=is_active, is_complete=False)
458 return [self._get_agent_task_for_special_task(task)
459 for task in special_tasks]
460
461
462 def _get_agent_task_for_queue_entry(self, queue_entry):
463 """
464 Construct an AgentTask instance for the given active HostQueueEntry,
465 if one can currently run it.
466 @param queue_entry: a HostQueueEntry
467 @returns an AgentTask to run the queue entry
468 """
469 task_entries = queue_entry.job.get_group_entries(queue_entry)
470 self._check_for_duplicate_host_entries(task_entries)
471
472 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
473 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000474 if queue_entry.is_hostless():
475 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000476 return QueueTask(queue_entries=task_entries)
477 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
478 return GatherLogsTask(queue_entries=task_entries)
479 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
480 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000481 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
482 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000483
Dale Curtisaa513362011-03-01 17:27:44 -0800484 raise host_scheduler.SchedulerError(
485 '_get_agent_task_for_queue_entry got entry with '
486 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000487
488
489 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000490 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
491 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000492 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000493 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000494 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000495 if using_host:
showardd1195652009-12-08 22:21:02 +0000496 self._assert_host_has_no_agent(task_entry)
497
498
499 def _assert_host_has_no_agent(self, entry):
500 """
501 @param entry: a HostQueueEntry or a SpecialTask
502 """
503 if self.host_has_agent(entry.host):
504 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800505 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000506 'While scheduling %s, host %s already has a host agent %s'
507 % (entry, entry.host, agent.task))
508
509
510 def _get_agent_task_for_special_task(self, special_task):
511 """
512 Construct an AgentTask class to run the given SpecialTask and add it
513 to this dispatcher.
514 @param special_task: a models.SpecialTask instance
515 @returns an AgentTask to run this SpecialTask
516 """
517 self._assert_host_has_no_agent(special_task)
518
Dan Shi07e09af2013-04-12 09:31:29 -0700519 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask,
520 ResetTask)
showardd1195652009-12-08 22:21:02 +0000521 for agent_task_class in special_agent_task_classes:
522 if agent_task_class.TASK_TYPE == special_task.task:
523 return agent_task_class(task=special_task)
524
Dale Curtisaa513362011-03-01 17:27:44 -0800525 raise host_scheduler.SchedulerError(
526 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000527
528
529 def _register_pidfiles(self, agent_tasks):
530 for agent_task in agent_tasks:
531 agent_task.register_necessary_pidfiles()
532
533
534 def _recover_tasks(self, agent_tasks):
535 orphans = _drone_manager.get_orphaned_autoserv_processes()
536
537 for agent_task in agent_tasks:
538 agent_task.recover()
539 if agent_task.monitor and agent_task.monitor.has_process():
540 orphans.discard(agent_task.monitor.get_process())
541 self.add_agent_task(agent_task)
542
543 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000544
545
showard8cc058f2009-09-08 16:26:33 +0000546 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000547 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
548 % status):
showard0db3d432009-10-12 20:29:15 +0000549 if entry.status == status and not self.get_agents_for_entry(entry):
550 # The status can change during iteration, e.g., if job.run()
551 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000552 yield entry
553
554
showard6878e8b2009-07-20 22:37:45 +0000555 def _check_for_remaining_orphan_processes(self, orphans):
556 if not orphans:
557 return
558 subject = 'Unrecovered orphan autoserv processes remain'
559 message = '\n'.join(str(process) for process in orphans)
560 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000561
562 die_on_orphans = global_config.global_config.get_config_value(
563 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
564
565 if die_on_orphans:
566 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000567
showard170873e2009-01-07 00:22:26 +0000568
showard8cc058f2009-09-08 16:26:33 +0000569 def _recover_pending_entries(self):
570 for entry in self._get_unassigned_entries(
571 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000572 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000573 entry.on_pending()
574
575
showardb8900452009-10-12 20:31:01 +0000576 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000577 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000578 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
579 unrecovered_hqes = []
580 for queue_entry in queue_entries:
581 special_tasks = models.SpecialTask.objects.filter(
582 task__in=(models.SpecialTask.Task.CLEANUP,
583 models.SpecialTask.Task.VERIFY),
584 queue_entry__id=queue_entry.id,
585 is_complete=False)
586 if special_tasks.count() == 0:
587 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000588
showardb8900452009-10-12 20:31:01 +0000589 if unrecovered_hqes:
590 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800591 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000592 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000593 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000594
595
showard65db3932009-10-28 19:54:35 +0000596 def _get_prioritized_special_tasks(self):
597 """
598 Returns all queued SpecialTasks prioritized for repair first, then
599 cleanup, then verify.
600 """
601 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
602 is_complete=False,
603 host__locked=False)
604 # exclude hosts with active queue entries unless the SpecialTask is for
605 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000606 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000607 queued_tasks, 'afe_host_queue_entries', 'host_id',
608 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000609 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000610 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000611 where=['(afe_host_queue_entries.id IS NULL OR '
612 'afe_host_queue_entries.id = '
613 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000614
showard65db3932009-10-28 19:54:35 +0000615 # reorder tasks by priority
616 task_priority_order = [models.SpecialTask.Task.REPAIR,
617 models.SpecialTask.Task.CLEANUP,
Dan Shi07e09af2013-04-12 09:31:29 -0700618 models.SpecialTask.Task.VERIFY,
619 models.SpecialTask.Task.RESET]
showard65db3932009-10-28 19:54:35 +0000620 def task_priority_key(task):
621 return task_priority_order.index(task.task)
622 return sorted(queued_tasks, key=task_priority_key)
623
624
showard65db3932009-10-28 19:54:35 +0000625 def _schedule_special_tasks(self):
626 """
627 Execute queued SpecialTasks that are ready to run on idle hosts.
628 """
629 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000630 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000631 continue
showardd1195652009-12-08 22:21:02 +0000632 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000633
634
showard170873e2009-01-07 00:22:26 +0000635 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000636 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000637 # should never happen
showarded2afea2009-07-07 20:54:07 +0000638 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000639 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000640 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000641 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000642 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000643
644
jadmanski0afbb632008-06-06 21:10:57 +0000645 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000646 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700647 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000648 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000649 if self.host_has_agent(host):
650 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000651 continue
showard8cc058f2009-09-08 16:26:33 +0000652 if self._host_has_scheduled_special_task(host):
653 # host will have a special task scheduled on the next cycle
654 continue
showard170873e2009-01-07 00:22:26 +0000655 if print_message:
showardb18134f2009-03-20 20:52:18 +0000656 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000657 models.SpecialTask.objects.create(
658 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000659 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000660
661
jadmanski0afbb632008-06-06 21:10:57 +0000662 def _recover_hosts(self):
663 # recover "Repair Failed" hosts
664 message = 'Reverifying dead host %s'
665 self._reverify_hosts_where("status = 'Repair Failed'",
666 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000667
668
showard04c82c52008-05-29 19:38:12 +0000669
showardb95b1bd2008-08-15 18:11:04 +0000670 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000671 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +0000672 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000673 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000674 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +0000675 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000676
677
showard89f84db2009-03-12 20:39:13 +0000678 def _refresh_pending_queue_entries(self):
679 """
680 Lookup the pending HostQueueEntries and call our HostScheduler
681 refresh() method given that list. Return the list.
682
683 @returns A list of pending HostQueueEntries sorted in priority order.
684 """
showard63a34772008-08-18 19:32:50 +0000685 queue_entries = self._get_pending_queue_entries()
686 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000687 return []
showardb95b1bd2008-08-15 18:11:04 +0000688
showard63a34772008-08-18 19:32:50 +0000689 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000690
showard89f84db2009-03-12 20:39:13 +0000691 return queue_entries
692
693
694 def _schedule_atomic_group(self, queue_entry):
695 """
696 Schedule the given queue_entry on an atomic group of hosts.
697
698 Returns immediately if there are insufficient available hosts.
699
700 Creates new HostQueueEntries based off of queue_entry for the
701 scheduled hosts and starts them all running.
702 """
703 # This is a virtual host queue entry representing an entire
704 # atomic group, find a group and schedule their hosts.
705 group_hosts = self._host_scheduler.find_eligible_atomic_group(
706 queue_entry)
707 if not group_hosts:
708 return
showardcbe6f942009-06-17 19:33:49 +0000709
710 logging.info('Expanding atomic group entry %s with hosts %s',
711 queue_entry,
712 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000713
showard89f84db2009-03-12 20:39:13 +0000714 for assigned_host in group_hosts[1:]:
715 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000716 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000717 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000718 new_hqe.set_host(assigned_host)
719 self._run_queue_entry(new_hqe)
720
721 # The first assigned host uses the original HostQueueEntry
722 queue_entry.set_host(group_hosts[0])
723 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000724
725
showarda9545c02009-12-18 22:44:26 +0000726 def _schedule_hostless_job(self, queue_entry):
727 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000728 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000729
730
showard89f84db2009-03-12 20:39:13 +0000731 def _schedule_new_jobs(self):
732 queue_entries = self._refresh_pending_queue_entries()
733 if not queue_entries:
734 return
735
Simran Basi3f6717d2012-09-13 15:21:22 -0700736 logging.debug('Processing %d queue_entries', len(queue_entries))
showard63a34772008-08-18 19:32:50 +0000737 for queue_entry in queue_entries:
Simran Basidef92872012-09-20 13:34:34 -0700738 self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
showarde55955f2009-10-07 20:48:58 +0000739 is_unassigned_atomic_group = (
740 queue_entry.atomic_group_id is not None
741 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +0000742
743 if queue_entry.is_hostless():
Simran Basidef92872012-09-20 13:34:34 -0700744 self._log_extra_msg('Scheduling hostless job.')
showarda9545c02009-12-18 22:44:26 +0000745 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000746 elif is_unassigned_atomic_group:
747 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +0000748 else:
jamesren883492a2010-02-12 00:45:18 +0000749 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +0000750 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +0000751 assert assigned_host.id == queue_entry.host_id
752 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000753
754
showard8cc058f2009-09-08 16:26:33 +0000755 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +0000756 for agent_task in self._get_queue_entry_agent_tasks():
757 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000758
759
760 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000761 for entry in scheduler_models.HostQueueEntry.fetch(
762 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000763 task = entry.job.schedule_delayed_callback_task(entry)
764 if task:
showardd1195652009-12-08 22:21:02 +0000765 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000766
767
jamesren883492a2010-02-12 00:45:18 +0000768 def _run_queue_entry(self, queue_entry):
Simran Basidef92872012-09-20 13:34:34 -0700769 self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
770 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000771 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000772
773
jadmanski0afbb632008-06-06 21:10:57 +0000774 def _find_aborting(self):
jamesrene7c65cb2010-06-08 20:38:10 +0000775 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000776 for entry in scheduler_models.HostQueueEntry.fetch(
777 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +0000778 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +0000779 for agent in self.get_agents_for_entry(entry):
780 agent.abort()
781 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000782 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700783 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000784 for job in jobs_to_stop:
785 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000786
787
showard324bf812009-01-20 23:23:38 +0000788 def _can_start_agent(self, agent, num_started_this_cycle,
789 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000790 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000791 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000792 return True
793 # don't allow any nonzero-process agents to run after we've reached a
794 # limit (this avoids starvation of many-process agents)
795 if have_reached_limit:
796 return False
797 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000798 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000799 agent.task.owner_username,
800 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000801 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000802 return False
803 # if a single agent exceeds the per-cycle throttling, still allow it to
804 # run when it's the first agent in the cycle
805 if num_started_this_cycle == 0:
806 return True
807 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000808 if (num_started_this_cycle + agent.task.num_processes >
809 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000810 return False
811 return True
812
813
jadmanski0afbb632008-06-06 21:10:57 +0000814 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000815 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000816 have_reached_limit = False
817 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700818 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000819 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700820 self._log_extra_msg('Processing Agent with Host Ids: %s and '
821 'queue_entry ids:%s' % (agent.host_ids,
822 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000823 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000824 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000825 have_reached_limit):
826 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700827 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000828 continue
showardd1195652009-12-08 22:21:02 +0000829 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700830 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000831 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700832 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000833 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700834 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000835 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700836 logging.info('%d running processes. %d added this cycle.',
837 _drone_manager.total_running_processes(),
838 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +0000839
840
showard29f7cd22009-04-29 21:16:24 +0000841 def _process_recurring_runs(self):
842 recurring_runs = models.RecurringRun.objects.filter(
843 start_date__lte=datetime.datetime.now())
844 for rrun in recurring_runs:
845 # Create job from template
846 job = rrun.job
847 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000848 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000849
850 host_objects = info['hosts']
851 one_time_hosts = info['one_time_hosts']
852 metahost_objects = info['meta_hosts']
853 dependencies = info['dependencies']
854 atomic_group = info['atomic_group']
855
856 for host in one_time_hosts or []:
857 this_host = models.Host.create_one_time_host(host.hostname)
858 host_objects.append(this_host)
859
860 try:
861 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000862 options=options,
showard29f7cd22009-04-29 21:16:24 +0000863 host_objects=host_objects,
864 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +0000865 atomic_group=atomic_group)
866
867 except Exception, ex:
868 logging.exception(ex)
869 #TODO send email
870
871 if rrun.loop_count == 1:
872 rrun.delete()
873 else:
874 if rrun.loop_count != 0: # if not infinite loop
875 # calculate new start_date
876 difference = datetime.timedelta(seconds=rrun.loop_period)
877 rrun.start_date = rrun.start_date + difference
878 rrun.loop_count -= 1
879 rrun.save()
880
881
Simran Basia858a232012-08-21 11:04:37 -0700882SiteDispatcher = utils.import_site_class(
883 __file__, 'autotest_lib.scheduler.site_monitor_db',
884 'SiteDispatcher', BaseDispatcher)
885
886class Dispatcher(SiteDispatcher):
887 pass
888
889
showard170873e2009-01-07 00:22:26 +0000890class PidfileRunMonitor(object):
891 """
892 Client must call either run() to start a new process or
893 attach_to_existing_process().
894 """
mbligh36768f02008-02-22 18:28:33 +0000895
showard170873e2009-01-07 00:22:26 +0000896 class _PidfileException(Exception):
897 """
898 Raised when there's some unexpected behavior with the pid file, but only
899 used internally (never allowed to escape this class).
900 """
mbligh36768f02008-02-22 18:28:33 +0000901
902
showard170873e2009-01-07 00:22:26 +0000903 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000904 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000905 self._start_time = None
906 self.pidfile_id = None
907 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000908
909
showard170873e2009-01-07 00:22:26 +0000910 def _add_nice_command(self, command, nice_level):
911 if not nice_level:
912 return command
913 return ['nice', '-n', str(nice_level)] + command
914
915
916 def _set_start_time(self):
917 self._start_time = time.time()
918
919
showard418785b2009-11-23 20:19:59 +0000920 def run(self, command, working_directory, num_processes, nice_level=None,
921 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +0000922 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +0000923 assert command is not None
924 if nice_level is not None:
925 command = ['nice', '-n', str(nice_level)] + command
926 self._set_start_time()
927 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +0000928 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +0000929 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +0000930 paired_with_pidfile=paired_with_pidfile, username=username,
931 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +0000932
933
showarded2afea2009-07-07 20:54:07 +0000934 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +0000935 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +0000936 num_processes=None):
showard170873e2009-01-07 00:22:26 +0000937 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +0000938 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000939 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +0000940 if num_processes is not None:
941 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +0000942
943
jadmanski0afbb632008-06-06 21:10:57 +0000944 def kill(self):
showard170873e2009-01-07 00:22:26 +0000945 if self.has_process():
946 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000947
mbligh36768f02008-02-22 18:28:33 +0000948
showard170873e2009-01-07 00:22:26 +0000949 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000950 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000951 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000952
953
showard170873e2009-01-07 00:22:26 +0000954 def get_process(self):
showard21baa452008-10-21 00:08:39 +0000955 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +0000956 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +0000957 return self._state.process
mblighbb421852008-03-11 22:36:16 +0000958
959
showard170873e2009-01-07 00:22:26 +0000960 def _read_pidfile(self, use_second_read=False):
961 assert self.pidfile_id is not None, (
962 'You must call run() or attach_to_existing_process()')
963 contents = _drone_manager.get_pidfile_contents(
964 self.pidfile_id, use_second_read=use_second_read)
965 if contents.is_invalid():
966 self._state = drone_manager.PidfileContents()
967 raise self._PidfileException(contents)
968 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +0000969
970
showard21baa452008-10-21 00:08:39 +0000971 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +0000972 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
973 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +0000974 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +0000975 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +0000976
977
978 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +0000979 if self.lost_process:
showard21baa452008-10-21 00:08:39 +0000980 return
mblighbb421852008-03-11 22:36:16 +0000981
showard21baa452008-10-21 00:08:39 +0000982 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +0000983
showard170873e2009-01-07 00:22:26 +0000984 if self._state.process is None:
985 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +0000986 return
mbligh90a549d2008-03-25 23:52:34 +0000987
showard21baa452008-10-21 00:08:39 +0000988 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000989 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +0000990 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +0000991 return
mbligh90a549d2008-03-25 23:52:34 +0000992
showard170873e2009-01-07 00:22:26 +0000993 # pid but no running process - maybe process *just* exited
994 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +0000995 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000996 # autoserv exited without writing an exit code
997 # to the pidfile
showard21baa452008-10-21 00:08:39 +0000998 self._handle_pidfile_error(
999 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001000
showard21baa452008-10-21 00:08:39 +00001001
1002 def _get_pidfile_info(self):
1003 """\
1004 After completion, self._state will contain:
1005 pid=None, exit_status=None if autoserv has not yet run
1006 pid!=None, exit_status=None if autoserv is running
1007 pid!=None, exit_status!=None if autoserv has completed
1008 """
1009 try:
1010 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001011 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001012 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001013
1014
showard170873e2009-01-07 00:22:26 +00001015 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001016 """\
1017 Called when no pidfile is found or no pid is in the pidfile.
1018 """
showard170873e2009-01-07 00:22:26 +00001019 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001020 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001021 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001022 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001023 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001024
1025
showard35162b02009-03-03 02:17:30 +00001026 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001027 """\
1028 Called when autoserv has exited without writing an exit status,
1029 or we've timed out waiting for autoserv to write a pid to the
1030 pidfile. In either case, we just return failure and the caller
1031 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001032
showard170873e2009-01-07 00:22:26 +00001033 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001034 """
1035 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001036 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001037 self._state.exit_status = 1
1038 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001039
1040
jadmanski0afbb632008-06-06 21:10:57 +00001041 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001042 self._get_pidfile_info()
1043 return self._state.exit_status
1044
1045
1046 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001047 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001048 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001049 if self._state.num_tests_failed is None:
1050 return -1
showard21baa452008-10-21 00:08:39 +00001051 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001052
1053
showardcdaeae82009-08-31 18:32:48 +00001054 def try_copy_results_on_drone(self, **kwargs):
1055 if self.has_process():
1056 # copy results logs into the normal place for job results
1057 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1058
1059
1060 def try_copy_to_results_repository(self, source, **kwargs):
1061 if self.has_process():
1062 _drone_manager.copy_to_results_repository(self.get_process(),
1063 source, **kwargs)
1064
1065
mbligh36768f02008-02-22 18:28:33 +00001066class Agent(object):
showard77182562009-06-10 00:16:05 +00001067 """
Alex Miller47715eb2013-07-24 03:34:01 -07001068 An agent for use by the Dispatcher class to perform a task. An agent wraps
1069 around an AgentTask mainly to associate the AgentTask with the queue_entry
1070 and host ids.
showard77182562009-06-10 00:16:05 +00001071
1072 The following methods are required on all task objects:
1073 poll() - Called periodically to let the task check its status and
1074 update its internal state. If the task succeeded.
1075 is_done() - Returns True if the task is finished.
1076 abort() - Called when an abort has been requested. The task must
1077 set its aborted attribute to True if it actually aborted.
1078
1079 The following attributes are required on all task objects:
1080 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001081 success - bool, True if this task succeeded.
1082 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1083 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001084 """
1085
1086
showard418785b2009-11-23 20:19:59 +00001087 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001088 """
Alex Miller47715eb2013-07-24 03:34:01 -07001089 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001090 """
showard8cc058f2009-09-08 16:26:33 +00001091 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001092
showard77182562009-06-10 00:16:05 +00001093 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001094 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001095
showard8cc058f2009-09-08 16:26:33 +00001096 self.queue_entry_ids = task.queue_entry_ids
1097 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001098
showard8cc058f2009-09-08 16:26:33 +00001099 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001100 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001101
1102
jadmanski0afbb632008-06-06 21:10:57 +00001103 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001104 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001105 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001106 self.task.poll()
1107 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001108 self.finished = True
showardec113162008-05-08 00:52:49 +00001109
1110
jadmanski0afbb632008-06-06 21:10:57 +00001111 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001112 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001113
1114
showardd3dc1992009-04-22 21:01:40 +00001115 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001116 if self.task:
1117 self.task.abort()
1118 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001119 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001120 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001121
showardd3dc1992009-04-22 21:01:40 +00001122
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001123class BaseAgentTask(object):
showardd1195652009-12-08 22:21:02 +00001124 class _NullMonitor(object):
1125 pidfile_id = None
1126
1127 def has_process(self):
1128 return True
1129
1130
1131 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001132 """
showardd1195652009-12-08 22:21:02 +00001133 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001134 """
jadmanski0afbb632008-06-06 21:10:57 +00001135 self.done = False
showardd1195652009-12-08 22:21:02 +00001136 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001137 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001138 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001139 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001140 self.queue_entry_ids = []
1141 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001142 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001143
1144
1145 def _set_ids(self, host=None, queue_entries=None):
1146 if queue_entries and queue_entries != [None]:
1147 self.host_ids = [entry.host.id for entry in queue_entries]
1148 self.queue_entry_ids = [entry.id for entry in queue_entries]
1149 else:
1150 assert host
1151 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001152
1153
jadmanski0afbb632008-06-06 21:10:57 +00001154 def poll(self):
showard08a36412009-05-05 01:01:13 +00001155 if not self.started:
1156 self.start()
showardd1195652009-12-08 22:21:02 +00001157 if not self.done:
1158 self.tick()
showard08a36412009-05-05 01:01:13 +00001159
1160
1161 def tick(self):
showardd1195652009-12-08 22:21:02 +00001162 assert self.monitor
1163 exit_code = self.monitor.exit_code()
1164 if exit_code is None:
1165 return
mbligh36768f02008-02-22 18:28:33 +00001166
showardd1195652009-12-08 22:21:02 +00001167 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001168 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001169
1170
jadmanski0afbb632008-06-06 21:10:57 +00001171 def is_done(self):
1172 return self.done
mbligh36768f02008-02-22 18:28:33 +00001173
1174
jadmanski0afbb632008-06-06 21:10:57 +00001175 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001176 if self.done:
showardd1195652009-12-08 22:21:02 +00001177 assert self.started
showard08a36412009-05-05 01:01:13 +00001178 return
showardd1195652009-12-08 22:21:02 +00001179 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001180 self.done = True
1181 self.success = success
1182 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001183
1184
jadmanski0afbb632008-06-06 21:10:57 +00001185 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001186 """
1187 To be overridden.
1188 """
showarded2afea2009-07-07 20:54:07 +00001189 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001190 self.register_necessary_pidfiles()
1191
1192
1193 def _log_file(self):
1194 if not self._log_file_name:
1195 return None
1196 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001197
mbligh36768f02008-02-22 18:28:33 +00001198
jadmanski0afbb632008-06-06 21:10:57 +00001199 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001200 log_file = self._log_file()
1201 if self.monitor and log_file:
1202 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001203
1204
jadmanski0afbb632008-06-06 21:10:57 +00001205 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001206 """
1207 To be overridden.
1208 """
jadmanski0afbb632008-06-06 21:10:57 +00001209 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001210 logging.info("%s finished with success=%s", type(self).__name__,
1211 self.success)
1212
mbligh36768f02008-02-22 18:28:33 +00001213
1214
jadmanski0afbb632008-06-06 21:10:57 +00001215 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001216 if not self.started:
1217 self.prolog()
1218 self.run()
1219
1220 self.started = True
1221
1222
1223 def abort(self):
1224 if self.monitor:
1225 self.monitor.kill()
1226 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001227 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001228 self.cleanup()
1229
1230
showarded2afea2009-07-07 20:54:07 +00001231 def _get_consistent_execution_path(self, execution_entries):
1232 first_execution_path = execution_entries[0].execution_path()
1233 for execution_entry in execution_entries[1:]:
1234 assert execution_entry.execution_path() == first_execution_path, (
1235 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1236 execution_entry,
1237 first_execution_path,
1238 execution_entries[0]))
1239 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001240
1241
showarded2afea2009-07-07 20:54:07 +00001242 def _copy_results(self, execution_entries, use_monitor=None):
1243 """
1244 @param execution_entries: list of objects with execution_path() method
1245 """
showard6d1c1432009-08-20 23:30:39 +00001246 if use_monitor is not None and not use_monitor.has_process():
1247 return
1248
showarded2afea2009-07-07 20:54:07 +00001249 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001250 if use_monitor is None:
1251 assert self.monitor
1252 use_monitor = self.monitor
1253 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001254 execution_path = self._get_consistent_execution_path(execution_entries)
1255 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001256 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001257
showarda1e74b32009-05-12 17:32:04 +00001258
1259 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001260 for queue_entry in queue_entries:
1261 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001262
1263
mbligh4608b002010-01-05 18:22:35 +00001264 def _archive_results(self, queue_entries):
1265 for queue_entry in queue_entries:
1266 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001267
1268
showardd1195652009-12-08 22:21:02 +00001269 def _command_line(self):
1270 """
1271 Return the command line to run. Must be overridden.
1272 """
1273 raise NotImplementedError
1274
1275
1276 @property
1277 def num_processes(self):
1278 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001279 Return the number of processes forked by this BaseAgentTask's process.
1280 It may only be approximate. To be overridden if necessary.
showardd1195652009-12-08 22:21:02 +00001281 """
1282 return 1
1283
1284
1285 def _paired_with_monitor(self):
1286 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001287 If this BaseAgentTask's process must run on the same machine as some
showardd1195652009-12-08 22:21:02 +00001288 previous process, this method should be overridden to return a
1289 PidfileRunMonitor for that process.
1290 """
1291 return self._NullMonitor()
1292
1293
1294 @property
1295 def owner_username(self):
1296 """
1297 Return login of user responsible for this task. May be None. Must be
1298 overridden.
1299 """
1300 raise NotImplementedError
1301
1302
1303 def _working_directory(self):
1304 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001305 Return the directory where this BaseAgentTask's process executes.
1306 Must be overridden.
showardd1195652009-12-08 22:21:02 +00001307 """
1308 raise NotImplementedError
1309
1310
1311 def _pidfile_name(self):
1312 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001313 Return the name of the pidfile this BaseAgentTask's process uses. To be
showardd1195652009-12-08 22:21:02 +00001314 overridden if necessary.
1315 """
jamesrenc44ae992010-02-19 00:12:54 +00001316 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001317
1318
1319 def _check_paired_results_exist(self):
1320 if not self._paired_with_monitor().has_process():
1321 email_manager.manager.enqueue_notify_email(
1322 'No paired results in task',
1323 'No paired results in task %s at %s'
1324 % (self, self._paired_with_monitor().pidfile_id))
1325 self.finished(False)
1326 return False
1327 return True
1328
1329
1330 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001331 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001332 self.monitor = PidfileRunMonitor()
1333
1334
1335 def run(self):
1336 if not self._check_paired_results_exist():
1337 return
1338
1339 self._create_monitor()
1340 self.monitor.run(
1341 self._command_line(), self._working_directory(),
1342 num_processes=self.num_processes,
1343 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1344 pidfile_name=self._pidfile_name(),
1345 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001346 username=self.owner_username,
1347 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1348
1349
1350 def get_drone_hostnames_allowed(self):
1351 if not models.DroneSet.drone_sets_enabled():
1352 return None
1353
1354 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1355 if not hqes:
1356 # Only special tasks could be missing host queue entries
1357 assert isinstance(self, SpecialAgentTask)
1358 return self._user_or_global_default_drone_set(
1359 self.task, self.task.requested_by)
1360
1361 job_ids = hqes.values_list('job', flat=True).distinct()
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001362 assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
jamesren76fcf192010-04-21 20:39:50 +00001363 "span multiple jobs")
1364
1365 job = models.Job.objects.get(id=job_ids[0])
1366 drone_set = job.drone_set
1367 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001368 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001369
1370 return drone_set.get_drone_hostnames()
1371
1372
1373 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1374 """
1375 Returns the user's default drone set, if present.
1376
1377 Otherwise, returns the global default drone set.
1378 """
1379 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1380 if not user:
1381 logging.warn('%s had no owner; using default drone set',
1382 obj_with_owner)
1383 return default_hostnames
1384 if not user.drone_set:
1385 logging.warn('User %s has no default drone set, using global '
1386 'default', user.login)
1387 return default_hostnames
1388 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001389
1390
1391 def register_necessary_pidfiles(self):
1392 pidfile_id = _drone_manager.get_pidfile_id_from(
1393 self._working_directory(), self._pidfile_name())
1394 _drone_manager.register_pidfile(pidfile_id)
1395
1396 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1397 if paired_pidfile_id:
1398 _drone_manager.register_pidfile(paired_pidfile_id)
1399
1400
1401 def recover(self):
1402 if not self._check_paired_results_exist():
1403 return
1404
1405 self._create_monitor()
1406 self.monitor.attach_to_existing_process(
1407 self._working_directory(), pidfile_name=self._pidfile_name(),
1408 num_processes=self.num_processes)
1409 if not self.monitor.has_process():
1410 # no process to recover; wait to be started normally
1411 self.monitor = None
1412 return
1413
1414 self.started = True
Aviv Keshet225bdfe2013-03-05 10:10:08 -08001415 logging.info('Recovering process %s for %s at %s',
1416 self.monitor.get_process(), type(self).__name__,
1417 self._working_directory())
mbligh36768f02008-02-22 18:28:33 +00001418
1419
mbligh4608b002010-01-05 18:22:35 +00001420 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1421 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001422 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001423 for entry in queue_entries:
1424 if entry.status not in allowed_hqe_statuses:
Dale Curtisaa513362011-03-01 17:27:44 -08001425 raise host_scheduler.SchedulerError(
1426 '%s attempting to start entry with invalid status %s: '
1427 '%s' % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001428 invalid_host_status = (
1429 allowed_host_statuses is not None
1430 and entry.host.status not in allowed_host_statuses)
1431 if invalid_host_status:
Dale Curtisaa513362011-03-01 17:27:44 -08001432 raise host_scheduler.SchedulerError(
1433 '%s attempting to start on queue entry with invalid '
1434 'host status %s: %s'
1435 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001436
1437
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001438SiteAgentTask = utils.import_site_class(
1439 __file__, 'autotest_lib.scheduler.site_monitor_db',
1440 'SiteAgentTask', BaseAgentTask)
1441
1442class AgentTask(SiteAgentTask):
1443 pass
1444
1445
showardd9205182009-04-27 20:09:55 +00001446class TaskWithJobKeyvals(object):
1447 """AgentTask mixin providing functionality to help with job keyval files."""
1448 _KEYVAL_FILE = 'keyval'
1449 def _format_keyval(self, key, value):
1450 return '%s=%s' % (key, value)
1451
1452
1453 def _keyval_path(self):
1454 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001455 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001456
1457
1458 def _write_keyval_after_job(self, field, value):
1459 assert self.monitor
1460 if not self.monitor.has_process():
1461 return
1462 _drone_manager.write_lines_to_file(
1463 self._keyval_path(), [self._format_keyval(field, value)],
1464 paired_with_process=self.monitor.get_process())
1465
1466
1467 def _job_queued_keyval(self, job):
1468 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1469
1470
1471 def _write_job_finished(self):
1472 self._write_keyval_after_job("job_finished", int(time.time()))
1473
1474
showarddb502762009-09-09 15:31:20 +00001475 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1476 keyval_contents = '\n'.join(self._format_keyval(key, value)
1477 for key, value in keyval_dict.iteritems())
1478 # always end with a newline to allow additional keyvals to be written
1479 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001480 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001481 keyval_contents,
1482 file_path=keyval_path)
1483
1484
1485 def _write_keyvals_before_job(self, keyval_dict):
1486 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1487
1488
1489 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001490 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001491 host.hostname)
1492 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001493 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001494 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1495 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1496
1497
showard8cc058f2009-09-08 16:26:33 +00001498class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001499 """
1500 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1501 """
1502
1503 TASK_TYPE = None
1504 host = None
1505 queue_entry = None
1506
showardd1195652009-12-08 22:21:02 +00001507 def __init__(self, task, extra_command_args):
1508 super(SpecialAgentTask, self).__init__()
1509
lmrb7c5d272010-04-16 06:34:04 +00001510 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001511
jamesrenc44ae992010-02-19 00:12:54 +00001512 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001513 self.queue_entry = None
1514 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001515 self.queue_entry = scheduler_models.HostQueueEntry(
1516 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001517
showarded2afea2009-07-07 20:54:07 +00001518 self.task = task
1519 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001520
1521
showard8cc058f2009-09-08 16:26:33 +00001522 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001523 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1524
1525
1526 def _command_line(self):
1527 return _autoserv_command_line(self.host.hostname,
1528 self._extra_command_args,
1529 queue_entry=self.queue_entry)
1530
1531
1532 def _working_directory(self):
1533 return self.task.execution_path()
1534
1535
1536 @property
1537 def owner_username(self):
1538 if self.task.requested_by:
1539 return self.task.requested_by.login
1540 return None
showard8cc058f2009-09-08 16:26:33 +00001541
1542
showarded2afea2009-07-07 20:54:07 +00001543 def prolog(self):
1544 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001545 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001546 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001547
1548
showardde634ee2009-01-30 01:44:24 +00001549 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001550 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001551
showard2fe3f1d2009-07-06 20:19:11 +00001552 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001553 return # don't fail metahost entries, they'll be reassigned
1554
showard2fe3f1d2009-07-06 20:19:11 +00001555 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001556 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001557 return # entry has been aborted
1558
showard2fe3f1d2009-07-06 20:19:11 +00001559 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001560 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001561 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001562 self._write_keyval_after_job(queued_key, queued_time)
1563 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001564
showard8cc058f2009-09-08 16:26:33 +00001565 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001566 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001567 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001568 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001569
showard8cc058f2009-09-08 16:26:33 +00001570 pidfile_id = _drone_manager.get_pidfile_id_from(
1571 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001572 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001573 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001574
1575 if self.queue_entry.job.parse_failed_repair:
1576 self._parse_results([self.queue_entry])
1577 else:
1578 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001579
Alex Miller23676a22013-07-03 09:03:36 -07001580 # Also fail all other special tasks that have not yet run for this HQE
1581 pending_tasks = models.SpecialTask.objects.filter(
1582 queue_entry__id=self.queue_entry.id,
1583 is_complete=0)
1584 if pending_tasks:
1585 for task in pending_tasks:
1586 task.finish(False)
1587
showard8cc058f2009-09-08 16:26:33 +00001588
1589 def cleanup(self):
1590 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001591
1592 # We will consider an aborted task to be "Failed"
1593 self.task.finish(bool(self.success))
1594
showardf85a0b72009-10-07 20:48:45 +00001595 if self.monitor:
1596 if self.monitor.has_process():
1597 self._copy_results([self.task])
1598 if self.monitor.pidfile_id is not None:
1599 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001600
1601
Dan Shi07e09af2013-04-12 09:31:29 -07001602 def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
1603 """Remove a type of special task in all tasks, keep last one if needed.
1604
1605 @param special_task_to_remove: type of special task to be removed, e.g.,
1606 models.SpecialTask.Task.VERIFY.
1607 @param keep_last_one: True to keep the last special task if its type is
1608 the same as of special_task_to_remove.
1609
1610 """
1611 queued_special_tasks = models.SpecialTask.objects.filter(
1612 host__id=self.host.id,
1613 task=special_task_to_remove,
1614 is_active=False, is_complete=False, queue_entry=None)
1615 if keep_last_one:
1616 queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
1617 queued_special_tasks.delete()
1618
1619
showard8cc058f2009-09-08 16:26:33 +00001620class RepairTask(SpecialAgentTask):
1621 TASK_TYPE = models.SpecialTask.Task.REPAIR
1622
1623
showardd1195652009-12-08 22:21:02 +00001624 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001625 """\
1626 queue_entry: queue entry to mark failed if this repair fails.
1627 """
1628 protection = host_protections.Protection.get_string(
1629 task.host.protection)
1630 # normalize the protection name
1631 protection = host_protections.Protection.get_attr_name(protection)
1632
1633 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001634 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001635
1636 # *don't* include the queue entry in IDs -- if the queue entry is
1637 # aborted, we want to leave the repair task running
1638 self._set_ids(host=self.host)
1639
1640
1641 def prolog(self):
1642 super(RepairTask, self).prolog()
1643 logging.info("repair_task starting")
1644 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001645
1646
jadmanski0afbb632008-06-06 21:10:57 +00001647 def epilog(self):
1648 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001649
jadmanski0afbb632008-06-06 21:10:57 +00001650 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001651 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001652 else:
showard8cc058f2009-09-08 16:26:33 +00001653 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001654 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001655 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001656
1657
showarded2afea2009-07-07 20:54:07 +00001658class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001659 def _copy_to_results_repository(self):
1660 if not self.queue_entry or self.queue_entry.meta_host:
1661 return
1662
1663 self.queue_entry.set_execution_subdir()
1664 log_name = os.path.basename(self.task.execution_path())
1665 source = os.path.join(self.task.execution_path(), 'debug',
1666 'autoserv.DEBUG')
1667 destination = os.path.join(
1668 self.queue_entry.execution_path(), log_name)
1669
1670 self.monitor.try_copy_to_results_repository(
1671 source, destination_path=destination)
1672
1673
showard170873e2009-01-07 00:22:26 +00001674 def epilog(self):
1675 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001676
showard775300b2009-09-09 15:30:50 +00001677 if self.success:
1678 return
showard8fe93b52008-11-18 17:53:22 +00001679
showard775300b2009-09-09 15:30:50 +00001680 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001681
showard775300b2009-09-09 15:30:50 +00001682 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001683 # effectively ignore failure for these hosts
1684 self.success = True
showard775300b2009-09-09 15:30:50 +00001685 return
1686
1687 if self.queue_entry:
1688 self.queue_entry.requeue()
Alex Millerf3f19452013-07-29 15:53:00 -07001689 # If we requeue a HQE, we should cancel any remaining pre-job
1690 # tasks against this host, otherwise we'll be left in a state
1691 # where a queued HQE has special tasks to run against a host.
1692 models.SpecialTask.objects.filter(
1693 queue_entry__id=self.queue_entry.id,
1694 host__id=self.host.id,
1695 is_complete=0).update(is_complete=1, success=0)
showard775300b2009-09-09 15:30:50 +00001696
1697 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001698 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001699 queue_entry__id=self.queue_entry.id):
1700 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1701 self._fail_queue_entry()
1702 return
1703
showard9bb960b2009-11-19 01:02:11 +00001704 queue_entry = models.HostQueueEntry.objects.get(
1705 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001706 else:
1707 queue_entry = None
1708
1709 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001710 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001711 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001712 queue_entry=queue_entry,
1713 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001714
showard8fe93b52008-11-18 17:53:22 +00001715
Alex Miller42437f92013-05-28 12:58:54 -07001716 def _should_pending(self):
1717 """
1718 Decide if we should call the host queue entry's on_pending method.
1719 We should if:
1720 1) There exists an associated host queue entry.
1721 2) The current special task completed successfully.
1722 3) There do not exist any more special tasks to be run before the
1723 host queue entry starts.
1724
1725 @returns: True if we should call pending, false if not.
1726
1727 """
1728 if not self.queue_entry or not self.success:
1729 return False
1730
1731 # We know if this is the last one when we create it, so we could add
1732 # another column to the database to keep track of this information, but
1733 # I expect the overhead of querying here to be minimal.
1734 queue_entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1735 queued = models.SpecialTask.objects.filter(
1736 host__id=self.host.id, is_active=False,
1737 is_complete=False, queue_entry=queue_entry)
1738 queued = queued.exclude(id=self.task.id)
1739 return queued.count() == 0
1740
1741
showard8fe93b52008-11-18 17:53:22 +00001742class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001743 TASK_TYPE = models.SpecialTask.Task.VERIFY
1744
1745
showardd1195652009-12-08 22:21:02 +00001746 def __init__(self, task):
1747 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001748 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001749
1750
jadmanski0afbb632008-06-06 21:10:57 +00001751 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001752 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001753
showardb18134f2009-03-20 20:52:18 +00001754 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001755 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001756 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1757 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001758
jamesren42318f72010-05-10 23:40:59 +00001759 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00001760 # and there's no need to keep records of other requests.
Dan Shi07e09af2013-04-12 09:31:29 -07001761 self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
1762 keep_last_one=True)
showard2fe3f1d2009-07-06 20:19:11 +00001763
mbligh36768f02008-02-22 18:28:33 +00001764
jadmanski0afbb632008-06-06 21:10:57 +00001765 def epilog(self):
1766 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001767 if self.success:
Alex Miller42437f92013-05-28 12:58:54 -07001768 if self._should_pending():
showard8cc058f2009-09-08 16:26:33 +00001769 self.queue_entry.on_pending()
1770 else:
1771 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001772
1773
mbligh4608b002010-01-05 18:22:35 +00001774class CleanupTask(PreJobTask):
1775 # note this can also run post-job, but when it does, it's running standalone
1776 # against the host (not related to the job), so it's not considered a
1777 # PostJobTask
1778
1779 TASK_TYPE = models.SpecialTask.Task.CLEANUP
1780
1781
1782 def __init__(self, task, recover_run_monitor=None):
1783 super(CleanupTask, self).__init__(task, ['--cleanup'])
1784 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1785
1786
1787 def prolog(self):
1788 super(CleanupTask, self).prolog()
1789 logging.info("starting cleanup task for host: %s", self.host.hostname)
1790 self.host.set_status(models.Host.Status.CLEANING)
1791 if self.queue_entry:
Dan Shi07e09af2013-04-12 09:31:29 -07001792 self.queue_entry.set_status(models.HostQueueEntry.Status.CLEANING)
mbligh4608b002010-01-05 18:22:35 +00001793
1794
1795 def _finish_epilog(self):
1796 if not self.queue_entry or not self.success:
1797 return
1798
1799 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
1800 should_run_verify = (
1801 self.queue_entry.job.run_verify
1802 and self.host.protection != do_not_verify_protection)
1803 if should_run_verify:
1804 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1805 models.SpecialTask.objects.create(
1806 host=models.Host.objects.get(id=self.host.id),
1807 queue_entry=entry,
1808 task=models.SpecialTask.Task.VERIFY)
1809 else:
Alex Miller42437f92013-05-28 12:58:54 -07001810 if self._should_pending():
1811 self.queue_entry.on_pending()
mbligh4608b002010-01-05 18:22:35 +00001812
1813
1814 def epilog(self):
1815 super(CleanupTask, self).epilog()
1816
1817 if self.success:
1818 self.host.update_field('dirty', 0)
1819 self.host.set_status(models.Host.Status.READY)
1820
1821 self._finish_epilog()
1822
1823
Dan Shi07e09af2013-04-12 09:31:29 -07001824class ResetTask(PreJobTask):
1825 """Task to reset a DUT, including cleanup and verify."""
1826 # note this can also run post-job, but when it does, it's running standalone
1827 # against the host (not related to the job), so it's not considered a
1828 # PostJobTask
1829
1830 TASK_TYPE = models.SpecialTask.Task.RESET
1831
1832
1833 def __init__(self, task, recover_run_monitor=None):
1834 super(ResetTask, self).__init__(task, ['--reset'])
1835 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1836
1837
1838 def prolog(self):
1839 super(ResetTask, self).prolog()
1840 logging.info('starting reset task for host: %s',
1841 self.host.hostname)
1842 self.host.set_status(models.Host.Status.RESETTING)
1843 if self.queue_entry:
1844 self.queue_entry.set_status(models.HostQueueEntry.Status.RESETTING)
1845
1846 # Delete any queued cleanups for this host.
1847 self.remove_special_tasks(models.SpecialTask.Task.CLEANUP,
1848 keep_last_one=False)
1849
1850 # Delete any queued reverifies for this host.
1851 self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
1852 keep_last_one=False)
1853
1854 # Only one reset is needed.
1855 self.remove_special_tasks(models.SpecialTask.Task.RESET,
1856 keep_last_one=True)
1857
1858
1859 def epilog(self):
1860 super(ResetTask, self).epilog()
1861
1862 if self.success:
1863 self.host.update_field('dirty', 0)
Dan Shi07e09af2013-04-12 09:31:29 -07001864
Alex Millerba076c52013-07-11 10:11:48 -07001865 if self._should_pending():
Dan Shi07e09af2013-04-12 09:31:29 -07001866 self.queue_entry.on_pending()
Alex Millerdc608d52013-07-30 14:26:21 -07001867 else:
1868 self.host.set_status(models.Host.Status.READY)
Dan Shi07e09af2013-04-12 09:31:29 -07001869
1870
showarda9545c02009-12-18 22:44:26 +00001871class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
1872 """
1873 Common functionality for QueueTask and HostlessQueueTask
1874 """
1875 def __init__(self, queue_entries):
1876 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001877 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001878 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001879
1880
showard73ec0442009-02-07 02:05:20 +00001881 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001882 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001883
1884
jamesrenc44ae992010-02-19 00:12:54 +00001885 def _write_control_file(self, execution_path):
1886 control_path = _drone_manager.attach_file_to_execution(
1887 execution_path, self.job.control_file)
1888 return control_path
1889
1890
Aviv Keshet308e7362013-05-21 14:43:16 -07001891 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001892 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001893 execution_path = self.queue_entries[0].execution_path()
1894 control_path = self._write_control_file(execution_path)
1895 hostnames = ','.join(entry.host.hostname
1896 for entry in self.queue_entries
1897 if not entry.is_hostless())
1898
1899 execution_tag = self.queue_entries[0].execution_tag()
1900 params = _autoserv_command_line(
1901 hostnames,
beepscb6f1e22013-06-28 19:14:10 -07001902 ['-P', execution_tag, '-n', '--verify_job_repo_url',
jamesrenc44ae992010-02-19 00:12:54 +00001903 _drone_manager.absolute_path(control_path)],
1904 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001905 if self.job.is_image_update_job():
1906 params += ['--image', self.job.update_image_path]
1907
jamesrenc44ae992010-02-19 00:12:54 +00001908 return params
showardd1195652009-12-08 22:21:02 +00001909
1910
1911 @property
1912 def num_processes(self):
1913 return len(self.queue_entries)
1914
1915
1916 @property
1917 def owner_username(self):
1918 return self.job.owner
1919
1920
1921 def _working_directory(self):
1922 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001923
1924
jadmanski0afbb632008-06-06 21:10:57 +00001925 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001926 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001927 keyval_dict = self.job.keyval_dict()
1928 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001929 group_name = self.queue_entries[0].get_group_name()
1930 if group_name:
1931 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001932 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001933 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001934 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001935 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001936
1937
showard35162b02009-03-03 02:17:30 +00001938 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001939 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001940 _drone_manager.write_lines_to_file(error_file_path,
1941 [_LOST_PROCESS_ERROR])
1942
1943
showardd3dc1992009-04-22 21:01:40 +00001944 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001945 if not self.monitor:
1946 return
1947
showardd9205182009-04-27 20:09:55 +00001948 self._write_job_finished()
1949
showard35162b02009-03-03 02:17:30 +00001950 if self.monitor.lost_process:
1951 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001952
jadmanskif7fa2cc2008-10-01 14:13:23 +00001953
showardcbd74612008-11-19 21:42:02 +00001954 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001955 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001956 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001957 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001958 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001959
1960
jadmanskif7fa2cc2008-10-01 14:13:23 +00001961 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001962 if not self.monitor or not self.monitor.has_process():
1963 return
1964
jadmanskif7fa2cc2008-10-01 14:13:23 +00001965 # build up sets of all the aborted_by and aborted_on values
1966 aborted_by, aborted_on = set(), set()
1967 for queue_entry in self.queue_entries:
1968 if queue_entry.aborted_by:
1969 aborted_by.add(queue_entry.aborted_by)
1970 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1971 aborted_on.add(t)
1972
1973 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001974 # TODO(showard): this conditional is now obsolete, we just need to leave
1975 # it in temporarily for backwards compatibility over upgrades. delete
1976 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001977 assert len(aborted_by) <= 1
1978 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001979 aborted_by_value = aborted_by.pop()
1980 aborted_on_value = max(aborted_on)
1981 else:
1982 aborted_by_value = 'autotest_system'
1983 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001984
showarda0382352009-02-11 23:36:43 +00001985 self._write_keyval_after_job("aborted_by", aborted_by_value)
1986 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001987
showardcbd74612008-11-19 21:42:02 +00001988 aborted_on_string = str(datetime.datetime.fromtimestamp(
1989 aborted_on_value))
1990 self._write_status_comment('Job aborted by %s on %s' %
1991 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001992
1993
jadmanski0afbb632008-06-06 21:10:57 +00001994 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001995 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001996 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001997 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001998
1999
jadmanski0afbb632008-06-06 21:10:57 +00002000 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002001 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002002 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002003
2004
2005class QueueTask(AbstractQueueTask):
2006 def __init__(self, queue_entries):
2007 super(QueueTask, self).__init__(queue_entries)
2008 self._set_ids(queue_entries=queue_entries)
2009
2010
2011 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002012 self._check_queue_entry_statuses(
2013 self.queue_entries,
2014 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2015 models.HostQueueEntry.Status.RUNNING),
2016 allowed_host_statuses=(models.Host.Status.PENDING,
2017 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002018
2019 super(QueueTask, self).prolog()
2020
2021 for queue_entry in self.queue_entries:
2022 self._write_host_keyvals(queue_entry.host)
2023 queue_entry.host.set_status(models.Host.Status.RUNNING)
2024 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00002025
2026
2027 def _finish_task(self):
2028 super(QueueTask, self)._finish_task()
2029
2030 for queue_entry in self.queue_entries:
2031 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00002032 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00002033
2034
mbligh4608b002010-01-05 18:22:35 +00002035class HostlessQueueTask(AbstractQueueTask):
2036 def __init__(self, queue_entry):
2037 super(HostlessQueueTask, self).__init__([queue_entry])
2038 self.queue_entry_ids = [queue_entry.id]
2039
2040
2041 def prolog(self):
2042 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2043 super(HostlessQueueTask, self).prolog()
2044
2045
mbligh4608b002010-01-05 18:22:35 +00002046 def _finish_task(self):
2047 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002048 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002049
2050
showardd3dc1992009-04-22 21:01:40 +00002051class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002052 def __init__(self, queue_entries, log_file_name):
2053 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002054
showardd1195652009-12-08 22:21:02 +00002055 self.queue_entries = queue_entries
2056
showardd3dc1992009-04-22 21:01:40 +00002057 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002058 self._autoserv_monitor.attach_to_existing_process(
2059 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002060
showardd1195652009-12-08 22:21:02 +00002061
2062 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002063 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002064 return 'true'
2065 return self._generate_command(
2066 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002067
2068
2069 def _generate_command(self, results_dir):
2070 raise NotImplementedError('Subclasses must override this')
2071
2072
showardd1195652009-12-08 22:21:02 +00002073 @property
2074 def owner_username(self):
2075 return self.queue_entries[0].job.owner
2076
2077
2078 def _working_directory(self):
2079 return self._get_consistent_execution_path(self.queue_entries)
2080
2081
2082 def _paired_with_monitor(self):
2083 return self._autoserv_monitor
2084
2085
showardd3dc1992009-04-22 21:01:40 +00002086 def _job_was_aborted(self):
2087 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002088 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002089 queue_entry.update_from_database()
2090 if was_aborted is None: # first queue entry
2091 was_aborted = bool(queue_entry.aborted)
2092 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00002093 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
2094 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00002095 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00002096 'Inconsistent abort state',
2097 'Queue entries have inconsistent abort state:\n' +
2098 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00002099 # don't crash here, just assume true
2100 return True
2101 return was_aborted
2102
2103
showardd1195652009-12-08 22:21:02 +00002104 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002105 if self._job_was_aborted():
2106 return models.HostQueueEntry.Status.ABORTED
2107
2108 # we'll use a PidfileRunMonitor to read the autoserv exit status
2109 if self._autoserv_monitor.exit_code() == 0:
2110 return models.HostQueueEntry.Status.COMPLETED
2111 return models.HostQueueEntry.Status.FAILED
2112
2113
showardd3dc1992009-04-22 21:01:40 +00002114 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002115 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002116 queue_entry.set_status(status)
2117
2118
2119 def abort(self):
2120 # override AgentTask.abort() to avoid killing the process and ending
2121 # the task. post-job tasks continue when the job is aborted.
2122 pass
2123
2124
mbligh4608b002010-01-05 18:22:35 +00002125 def _pidfile_label(self):
2126 # '.autoserv_execute' -> 'autoserv'
2127 return self._pidfile_name()[1:-len('_execute')]
2128
2129
showard9bb960b2009-11-19 01:02:11 +00002130class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002131 """
2132 Task responsible for
2133 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2134 * copying logs to the results repository
2135 * spawning CleanupTasks for hosts, if necessary
2136 * spawning a FinalReparseTask for the job
2137 """
showardd1195652009-12-08 22:21:02 +00002138 def __init__(self, queue_entries, recover_run_monitor=None):
2139 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002140 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002141 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002142 self._set_ids(queue_entries=queue_entries)
2143
2144
Aviv Keshet308e7362013-05-21 14:43:16 -07002145 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd3dc1992009-04-22 21:01:40 +00002146 def _generate_command(self, results_dir):
2147 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002148 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002149 return [_autoserv_path , '-p',
2150 '--pidfile-label=%s' % self._pidfile_label(),
2151 '--use-existing-results', '--collect-crashinfo',
2152 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002153
2154
showardd1195652009-12-08 22:21:02 +00002155 @property
2156 def num_processes(self):
2157 return len(self.queue_entries)
2158
2159
2160 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002161 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002162
2163
showardd3dc1992009-04-22 21:01:40 +00002164 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002165 self._check_queue_entry_statuses(
2166 self.queue_entries,
2167 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2168 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002169
showardd3dc1992009-04-22 21:01:40 +00002170 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002171
2172
showardd3dc1992009-04-22 21:01:40 +00002173 def epilog(self):
2174 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002175 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002176 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002177
showard9bb960b2009-11-19 01:02:11 +00002178
2179 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002180 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002181 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002182 models.HostQueueEntry.Status.COMPLETED)
2183 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2184 else:
2185 final_success = False
2186 num_tests_failed = 0
showard9bb960b2009-11-19 01:02:11 +00002187 reboot_after = self._job.reboot_after
2188 do_reboot = (
2189 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002190 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002191 or reboot_after == model_attributes.RebootAfter.ALWAYS
2192 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
Dan Shi07e09af2013-04-12 09:31:29 -07002193 and final_success and num_tests_failed == 0)
2194 or num_tests_failed > 0)
showard9bb960b2009-11-19 01:02:11 +00002195
showardd1195652009-12-08 22:21:02 +00002196 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002197 if do_reboot:
2198 # don't pass the queue entry to the CleanupTask. if the cleanup
2199 # fails, the job doesn't care -- it's over.
2200 models.SpecialTask.objects.create(
2201 host=models.Host.objects.get(id=queue_entry.host.id),
2202 task=models.SpecialTask.Task.CLEANUP,
2203 requested_by=self._job.owner_model())
2204 else:
2205 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002206
2207
showard0bbfc212009-04-29 21:06:13 +00002208 def run(self):
showard597bfd32009-05-08 18:22:50 +00002209 autoserv_exit_code = self._autoserv_monitor.exit_code()
2210 # only run if Autoserv exited due to some signal. if we have no exit
2211 # code, assume something bad (and signal-like) happened.
2212 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002213 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002214 else:
2215 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002216
2217
mbligh4608b002010-01-05 18:22:35 +00002218class SelfThrottledPostJobTask(PostJobTask):
2219 """
2220 Special AgentTask subclass that maintains its own global process limit.
2221 """
2222 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002223
2224
mbligh4608b002010-01-05 18:22:35 +00002225 @classmethod
2226 def _increment_running_processes(cls):
2227 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002228
mblighd5c95802008-03-05 00:33:46 +00002229
mbligh4608b002010-01-05 18:22:35 +00002230 @classmethod
2231 def _decrement_running_processes(cls):
2232 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002233
2234
mbligh4608b002010-01-05 18:22:35 +00002235 @classmethod
2236 def _max_processes(cls):
2237 raise NotImplementedError
2238
2239
2240 @classmethod
2241 def _can_run_new_process(cls):
2242 return cls._num_running_processes < cls._max_processes()
2243
2244
2245 def _process_started(self):
2246 return bool(self.monitor)
2247
2248
2249 def tick(self):
2250 # override tick to keep trying to start until the process count goes
2251 # down and we can, at which point we revert to default behavior
2252 if self._process_started():
2253 super(SelfThrottledPostJobTask, self).tick()
2254 else:
2255 self._try_starting_process()
2256
2257
2258 def run(self):
2259 # override run() to not actually run unless we can
2260 self._try_starting_process()
2261
2262
2263 def _try_starting_process(self):
2264 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002265 return
2266
mbligh4608b002010-01-05 18:22:35 +00002267 # actually run the command
2268 super(SelfThrottledPostJobTask, self).run()
jamesren25663562010-04-27 18:00:55 +00002269 if self._process_started():
2270 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002271
mblighd5c95802008-03-05 00:33:46 +00002272
mbligh4608b002010-01-05 18:22:35 +00002273 def finished(self, success):
2274 super(SelfThrottledPostJobTask, self).finished(success)
2275 if self._process_started():
2276 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002277
showard21baa452008-10-21 00:08:39 +00002278
mbligh4608b002010-01-05 18:22:35 +00002279class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002280 def __init__(self, queue_entries):
2281 super(FinalReparseTask, self).__init__(queue_entries,
2282 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002283 # don't use _set_ids, since we don't want to set the host_ids
2284 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002285
2286
2287 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002288 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002289 results_dir]
2290
2291
2292 @property
2293 def num_processes(self):
2294 return 0 # don't include parser processes in accounting
2295
2296
2297 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002298 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002299
2300
showard97aed502008-11-04 02:01:24 +00002301 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002302 def _max_processes(cls):
2303 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002304
2305
2306 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002307 self._check_queue_entry_statuses(
2308 self.queue_entries,
2309 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002310
showard97aed502008-11-04 02:01:24 +00002311 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002312
2313
2314 def epilog(self):
2315 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002316 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002317
2318
mbligh4608b002010-01-05 18:22:35 +00002319class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002320 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2321
mbligh4608b002010-01-05 18:22:35 +00002322 def __init__(self, queue_entries):
2323 super(ArchiveResultsTask, self).__init__(queue_entries,
2324 log_file_name='.archiving.log')
2325 # don't use _set_ids, since we don't want to set the host_ids
2326 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002327
2328
mbligh4608b002010-01-05 18:22:35 +00002329 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002330 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002331
2332
Aviv Keshet308e7362013-05-21 14:43:16 -07002333 # TODO: Refactor into autoserv_utils. crbug.com/243090
mbligh4608b002010-01-05 18:22:35 +00002334 def _generate_command(self, results_dir):
2335 return [_autoserv_path , '-p',
2336 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002337 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002338 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2339 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002340
2341
mbligh4608b002010-01-05 18:22:35 +00002342 @classmethod
2343 def _max_processes(cls):
2344 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002345
2346
2347 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002348 self._check_queue_entry_statuses(
2349 self.queue_entries,
2350 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2351
2352 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002353
2354
mbligh4608b002010-01-05 18:22:35 +00002355 def epilog(self):
2356 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002357 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002358 failed_file = os.path.join(self._working_directory(),
2359 self._ARCHIVING_FAILED_FILE)
2360 paired_process = self._paired_with_monitor().get_process()
2361 _drone_manager.write_lines_to_file(
2362 failed_file, ['Archiving failed with exit code %s'
2363 % self.monitor.exit_code()],
2364 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002365 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002366
2367
mbligh36768f02008-02-22 18:28:33 +00002368if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002369 main()