blob: 1dc0799132ada93e238fd0d0845f959051aa393c [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
mbligh36768f02008-02-22 18:28:33 +00008
Aviv Keshet225bdfe2013-03-05 10:10:08 -08009import datetime, optparse, os, signal
10import sys, time, traceback, urllib
11import logging, gc
showard402934a2009-12-21 22:20:47 +000012
Alex Miller05d7b4c2013-03-04 07:49:38 -080013import common
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000015
16import django.db
17
showard136e6dc2009-06-10 19:38:49 +000018from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000019from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000020from autotest_lib.database import database_connection
jamesrendd855242010-03-02 22:23:44 +000021from autotest_lib.frontend.afe import model_attributes
Alex Miller05d7b4c2013-03-04 07:49:38 -080022from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000023from autotest_lib.scheduler import drone_manager, drones, email_manager
Dale Curtisaa513362011-03-01 17:27:44 -080024from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
Alex Miller05d7b4c2013-03-04 07:49:38 -080025from autotest_lib.scheduler import scheduler_logging_config
jamesrenc44ae992010-02-19 00:12:54 +000026from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080027from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070028from autotest_lib.server import autoserv_utils
Fang Deng1d6c2a02013-04-17 15:25:45 -070029from autotest_lib.site_utils.graphite import stats
Alex Miller05d7b4c2013-03-04 07:49:38 -080030
showard549afad2009-08-20 23:33:36 +000031BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
32PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000033
mbligh36768f02008-02-22 18:28:33 +000034RESULTS_DIR = '.'
35AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000036DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000037AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
38
39if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000040 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000041AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
42AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
43
44if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000045 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000046
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
Aviv Keshet308e7362013-05-21 14:43:16 -070056_autoserv_directory = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server')
57_autoserv_path = os.path.join(_autoserv_directory, 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000059_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000060
Eric Lie0493a42010-11-15 13:05:43 -080061def _parser_path_default(install_dir):
62 return os.path.join(install_dir, 'tko', 'parse')
63_parser_path_func = utils.import_site_function(
64 __file__, 'autotest_lib.scheduler.site_monitor_db',
65 'parser_path', _parser_path_default)
66_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
67
mbligh36768f02008-02-22 18:28:33 +000068
showardec6a3b92009-09-25 20:29:13 +000069def _get_pidfile_timeout_secs():
70 """@returns How long to wait for autoserv to write pidfile."""
71 pidfile_timeout_mins = global_config.global_config.get_config_value(
72 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
73 return pidfile_timeout_mins * 60
74
75
mbligh83c1e9e2009-05-01 23:10:41 +000076def _site_init_monitor_db_dummy():
77 return {}
78
79
jamesren76fcf192010-04-21 20:39:50 +000080def _verify_default_drone_set_exists():
81 if (models.DroneSet.drone_sets_enabled() and
82 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080083 raise host_scheduler.SchedulerError(
84 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000085
86
87def _sanity_check():
88 """Make sure the configs are consistent before starting the scheduler"""
89 _verify_default_drone_set_exists()
90
91
mbligh36768f02008-02-22 18:28:33 +000092def main():
showard27f33872009-04-07 18:20:53 +000093 try:
showard549afad2009-08-20 23:33:36 +000094 try:
95 main_without_exception_handling()
96 except SystemExit:
97 raise
98 except:
99 logging.exception('Exception escaping in monitor_db')
100 raise
101 finally:
102 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000103
104
105def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000106 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000107
showard136e6dc2009-06-10 19:38:49 +0000108 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000109 parser = optparse.OptionParser(usage)
110 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
111 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000112 parser.add_option('--test', help='Indicate that scheduler is under ' +
113 'test and should use dummy autoserv and no parsing',
114 action='store_true')
115 (options, args) = parser.parse_args()
116 if len(args) != 1:
117 parser.print_usage()
118 return
mbligh36768f02008-02-22 18:28:33 +0000119
showard5613c662009-06-08 23:30:33 +0000120 scheduler_enabled = global_config.global_config.get_config_value(
121 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
122
123 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800124 logging.error("Scheduler not enabled, set enable_scheduler to true in "
125 "the global_config's SCHEDULER section to enable it. "
126 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000127 sys.exit(1)
128
jadmanski0afbb632008-06-06 21:10:57 +0000129 global RESULTS_DIR
130 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000131
mbligh83c1e9e2009-05-01 23:10:41 +0000132 site_init = utils.import_site_function(__file__,
133 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
134 _site_init_monitor_db_dummy)
135 site_init()
136
showardcca334f2009-03-12 20:38:34 +0000137 # Change the cwd while running to avoid issues incase we were launched from
138 # somewhere odd (such as a random NFS home directory of the person running
139 # sudo to launch us as the appropriate user).
140 os.chdir(RESULTS_DIR)
141
jamesrenc7d387e2010-08-10 21:48:30 +0000142 # This is helpful for debugging why stuff a scheduler launches is
143 # misbehaving.
144 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000145
jadmanski0afbb632008-06-06 21:10:57 +0000146 if options.test:
147 global _autoserv_path
148 _autoserv_path = 'autoserv_dummy'
149 global _testing_mode
150 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000151
jamesrenc44ae992010-02-19 00:12:54 +0000152 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000153 server.start()
154
jadmanski0afbb632008-06-06 21:10:57 +0000155 try:
jamesrenc44ae992010-02-19 00:12:54 +0000156 initialize()
showardc5afc462009-01-13 00:09:39 +0000157 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000158 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000159
Eric Lia82dc352011-02-23 13:15:52 -0800160 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000161 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000162 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000163 except:
showard170873e2009-01-07 00:22:26 +0000164 email_manager.manager.log_stacktrace(
165 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000166
showard170873e2009-01-07 00:22:26 +0000167 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000168 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000169 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000170 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000171
172
showard136e6dc2009-06-10 19:38:49 +0000173def setup_logging():
174 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
175 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
176 logging_manager.configure_logging(
177 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
178 logfile_name=log_name)
179
180
mbligh36768f02008-02-22 18:28:33 +0000181def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000182 global _shutdown
183 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000184 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000185
186
jamesrenc44ae992010-02-19 00:12:54 +0000187def initialize():
showardb18134f2009-03-20 20:52:18 +0000188 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
189 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000190
showard8de37132009-08-31 18:33:08 +0000191 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000192 logging.critical("monitor_db already running, aborting!")
193 sys.exit(1)
194 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000195
showardb1e51872008-10-07 11:08:18 +0000196 if _testing_mode:
197 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000198 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000199
jadmanski0afbb632008-06-06 21:10:57 +0000200 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
201 global _db
showard170873e2009-01-07 00:22:26 +0000202 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000203 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000204
showardfa8629c2008-11-04 16:51:23 +0000205 # ensure Django connection is in autocommit
206 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000207 # bypass the readonly connection
208 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000209
showardb18134f2009-03-20 20:52:18 +0000210 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000211 signal.signal(signal.SIGINT, handle_sigint)
212
jamesrenc44ae992010-02-19 00:12:54 +0000213 initialize_globals()
214 scheduler_models.initialize()
215
showardd1ee1dd2009-01-07 21:33:08 +0000216 drones = global_config.global_config.get_config_value(
217 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
218 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000219 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000220 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000221 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
222
showardb18134f2009-03-20 20:52:18 +0000223 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000224
225
jamesrenc44ae992010-02-19 00:12:54 +0000226def initialize_globals():
227 global _drone_manager
228 _drone_manager = drone_manager.instance()
229
230
showarded2afea2009-07-07 20:54:07 +0000231def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
232 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000233 """
234 @returns The autoserv command line as a list of executable + parameters.
235
236 @param machines - string - A machine or comma separated list of machines
237 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000238 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700239 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
240 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000241 @param queue_entry - A HostQueueEntry object - If supplied and no Job
242 object was supplied, this will be used to lookup the Job object.
243 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700244 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
245 machines, results_directory=drone_manager.WORKING_DIRECTORY,
246 extra_args=extra_args, job=job, queue_entry=queue_entry,
247 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000248
249
Simran Basia858a232012-08-21 11:04:37 -0700250class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800251
252
jadmanski0afbb632008-06-06 21:10:57 +0000253 def __init__(self):
254 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000255 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800256 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000257 user_cleanup_time = scheduler_config.config.clean_interval
258 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
259 _db, user_cleanup_time)
260 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000261 self._host_agents = {}
262 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000263 self._tick_count = 0
264 self._last_garbage_stats_time = time.time()
265 self._seconds_between_garbage_stats = 60 * (
266 global_config.global_config.get_config_value(
267 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700268 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700269 self._tick_debug = global_config.global_config.get_config_value(
270 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
271 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700272 self._extra_debugging = global_config.global_config.get_config_value(
273 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
274 default=False)
mbligh36768f02008-02-22 18:28:33 +0000275
mbligh36768f02008-02-22 18:28:33 +0000276
showard915958d2009-04-22 21:00:58 +0000277 def initialize(self, recover_hosts=True):
278 self._periodic_cleanup.initialize()
279 self._24hr_upkeep.initialize()
280
jadmanski0afbb632008-06-06 21:10:57 +0000281 # always recover processes
282 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000283
jadmanski0afbb632008-06-06 21:10:57 +0000284 if recover_hosts:
285 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000286
jamesrenc44ae992010-02-19 00:12:54 +0000287 self._host_scheduler.recovery_on_startup()
288
mbligh36768f02008-02-22 18:28:33 +0000289
Simran Basi0ec94dd2012-08-28 09:50:10 -0700290 def _log_tick_msg(self, msg):
291 if self._tick_debug:
292 logging.debug(msg)
293
294
Simran Basidef92872012-09-20 13:34:34 -0700295 def _log_extra_msg(self, msg):
296 if self._extra_debugging:
297 logging.debug(msg)
298
299
jadmanski0afbb632008-06-06 21:10:57 +0000300 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700301 """
302 This is an altered version of tick() where we keep track of when each
303 major step begins so we can try to figure out where we are using most
304 of the tick time.
305 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700306 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700307 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000308 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700309 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000310 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700311 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000312 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700313 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000314 self._find_aborting()
Simran Basi3f6717d2012-09-13 15:21:22 -0700315 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000316 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700317 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000318 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700319 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000320 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700321 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000322 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700323 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000324 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700325 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000326 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700327 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000328 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700329 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000330 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700331 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700332 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700333 with timer.get_client('email_manager_send_queued_emails'):
334 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700335 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700336 with timer.get_client('django_db_reset_queries'):
337 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000338 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000339
showard97aed502008-11-04 02:01:24 +0000340
mblighf3294cc2009-04-08 21:17:38 +0000341 def _run_cleanup(self):
342 self._periodic_cleanup.run_cleanup_maybe()
343 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000344
mbligh36768f02008-02-22 18:28:33 +0000345
showardf13a9e22009-12-18 22:54:09 +0000346 def _garbage_collection(self):
347 threshold_time = time.time() - self._seconds_between_garbage_stats
348 if threshold_time < self._last_garbage_stats_time:
349 # Don't generate these reports very often.
350 return
351
352 self._last_garbage_stats_time = time.time()
353 # Force a full level 0 collection (because we can, it doesn't hurt
354 # at this interval).
355 gc.collect()
356 logging.info('Logging garbage collector stats on tick %d.',
357 self._tick_count)
358 gc_stats._log_garbage_collector_stats()
359
360
showard170873e2009-01-07 00:22:26 +0000361 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
362 for object_id in object_ids:
363 agent_dict.setdefault(object_id, set()).add(agent)
364
365
366 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
367 for object_id in object_ids:
368 assert object_id in agent_dict
369 agent_dict[object_id].remove(agent)
370
371
showardd1195652009-12-08 22:21:02 +0000372 def add_agent_task(self, agent_task):
373 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000374 self._agents.append(agent)
375 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000376 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
377 self._register_agent_for_ids(self._queue_entry_agents,
378 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000379
showard170873e2009-01-07 00:22:26 +0000380
381 def get_agents_for_entry(self, queue_entry):
382 """
383 Find agents corresponding to the specified queue_entry.
384 """
showardd3dc1992009-04-22 21:01:40 +0000385 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000386
387
388 def host_has_agent(self, host):
389 """
390 Determine if there is currently an Agent present using this host.
391 """
392 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000393
394
jadmanski0afbb632008-06-06 21:10:57 +0000395 def remove_agent(self, agent):
396 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000397 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
398 agent)
399 self._unregister_agent_for_ids(self._queue_entry_agents,
400 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000401
402
showard8cc058f2009-09-08 16:26:33 +0000403 def _host_has_scheduled_special_task(self, host):
404 return bool(models.SpecialTask.objects.filter(host__id=host.id,
405 is_active=False,
406 is_complete=False))
407
408
jadmanski0afbb632008-06-06 21:10:57 +0000409 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000410 agent_tasks = self._create_recovery_agent_tasks()
411 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000412 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000413 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000414 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000415 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000416 self._reverify_remaining_hosts()
417 # reinitialize drones after killing orphaned processes, since they can
418 # leave around files when they die
419 _drone_manager.execute_actions()
420 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000421
showard170873e2009-01-07 00:22:26 +0000422
showardd1195652009-12-08 22:21:02 +0000423 def _create_recovery_agent_tasks(self):
424 return (self._get_queue_entry_agent_tasks()
425 + self._get_special_task_agent_tasks(is_active=True))
426
427
428 def _get_queue_entry_agent_tasks(self):
429 # host queue entry statuses handled directly by AgentTasks (Verifying is
430 # handled through SpecialTasks, so is not listed here)
431 statuses = (models.HostQueueEntry.Status.STARTING,
432 models.HostQueueEntry.Status.RUNNING,
433 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000434 models.HostQueueEntry.Status.PARSING,
435 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000436 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000437 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000438 where='status IN (%s)' % status_list)
439
440 agent_tasks = []
441 used_queue_entries = set()
442 for entry in queue_entries:
443 if self.get_agents_for_entry(entry):
444 # already being handled
445 continue
446 if entry in used_queue_entries:
447 # already picked up by a synchronous job
448 continue
449 agent_task = self._get_agent_task_for_queue_entry(entry)
450 agent_tasks.append(agent_task)
451 used_queue_entries.update(agent_task.queue_entries)
452 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000453
454
showardd1195652009-12-08 22:21:02 +0000455 def _get_special_task_agent_tasks(self, is_active=False):
456 special_tasks = models.SpecialTask.objects.filter(
457 is_active=is_active, is_complete=False)
458 return [self._get_agent_task_for_special_task(task)
459 for task in special_tasks]
460
461
462 def _get_agent_task_for_queue_entry(self, queue_entry):
463 """
464 Construct an AgentTask instance for the given active HostQueueEntry,
465 if one can currently run it.
466 @param queue_entry: a HostQueueEntry
467 @returns an AgentTask to run the queue entry
468 """
469 task_entries = queue_entry.job.get_group_entries(queue_entry)
470 self._check_for_duplicate_host_entries(task_entries)
471
472 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
473 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000474 if queue_entry.is_hostless():
475 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000476 return QueueTask(queue_entries=task_entries)
477 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
478 return GatherLogsTask(queue_entries=task_entries)
479 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
480 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000481 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
482 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000483
Dale Curtisaa513362011-03-01 17:27:44 -0800484 raise host_scheduler.SchedulerError(
485 '_get_agent_task_for_queue_entry got entry with '
486 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000487
488
489 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000490 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
491 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000492 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000493 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000494 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000495 if using_host:
showardd1195652009-12-08 22:21:02 +0000496 self._assert_host_has_no_agent(task_entry)
497
498
499 def _assert_host_has_no_agent(self, entry):
500 """
501 @param entry: a HostQueueEntry or a SpecialTask
502 """
503 if self.host_has_agent(entry.host):
504 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800505 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000506 'While scheduling %s, host %s already has a host agent %s'
507 % (entry, entry.host, agent.task))
508
509
510 def _get_agent_task_for_special_task(self, special_task):
511 """
512 Construct an AgentTask class to run the given SpecialTask and add it
513 to this dispatcher.
514 @param special_task: a models.SpecialTask instance
515 @returns an AgentTask to run this SpecialTask
516 """
517 self._assert_host_has_no_agent(special_task)
518
Dan Shi07e09af2013-04-12 09:31:29 -0700519 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask,
520 ResetTask)
showardd1195652009-12-08 22:21:02 +0000521 for agent_task_class in special_agent_task_classes:
522 if agent_task_class.TASK_TYPE == special_task.task:
523 return agent_task_class(task=special_task)
524
Dale Curtisaa513362011-03-01 17:27:44 -0800525 raise host_scheduler.SchedulerError(
526 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000527
528
529 def _register_pidfiles(self, agent_tasks):
530 for agent_task in agent_tasks:
531 agent_task.register_necessary_pidfiles()
532
533
534 def _recover_tasks(self, agent_tasks):
535 orphans = _drone_manager.get_orphaned_autoserv_processes()
536
537 for agent_task in agent_tasks:
538 agent_task.recover()
539 if agent_task.monitor and agent_task.monitor.has_process():
540 orphans.discard(agent_task.monitor.get_process())
541 self.add_agent_task(agent_task)
542
543 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000544
545
showard8cc058f2009-09-08 16:26:33 +0000546 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000547 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
548 % status):
showard0db3d432009-10-12 20:29:15 +0000549 if entry.status == status and not self.get_agents_for_entry(entry):
550 # The status can change during iteration, e.g., if job.run()
551 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000552 yield entry
553
554
showard6878e8b2009-07-20 22:37:45 +0000555 def _check_for_remaining_orphan_processes(self, orphans):
556 if not orphans:
557 return
558 subject = 'Unrecovered orphan autoserv processes remain'
559 message = '\n'.join(str(process) for process in orphans)
560 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000561
562 die_on_orphans = global_config.global_config.get_config_value(
563 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
564
565 if die_on_orphans:
566 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000567
showard170873e2009-01-07 00:22:26 +0000568
showard8cc058f2009-09-08 16:26:33 +0000569 def _recover_pending_entries(self):
570 for entry in self._get_unassigned_entries(
571 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000572 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000573 entry.on_pending()
574
575
showardb8900452009-10-12 20:31:01 +0000576 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000577 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000578 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
579 unrecovered_hqes = []
580 for queue_entry in queue_entries:
581 special_tasks = models.SpecialTask.objects.filter(
582 task__in=(models.SpecialTask.Task.CLEANUP,
583 models.SpecialTask.Task.VERIFY),
584 queue_entry__id=queue_entry.id,
585 is_complete=False)
586 if special_tasks.count() == 0:
587 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000588
showardb8900452009-10-12 20:31:01 +0000589 if unrecovered_hqes:
590 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800591 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000592 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000593 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000594
595
showard65db3932009-10-28 19:54:35 +0000596 def _get_prioritized_special_tasks(self):
597 """
598 Returns all queued SpecialTasks prioritized for repair first, then
599 cleanup, then verify.
600 """
601 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
602 is_complete=False,
603 host__locked=False)
604 # exclude hosts with active queue entries unless the SpecialTask is for
605 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000606 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000607 queued_tasks, 'afe_host_queue_entries', 'host_id',
608 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000609 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000610 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000611 where=['(afe_host_queue_entries.id IS NULL OR '
612 'afe_host_queue_entries.id = '
613 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000614
showard65db3932009-10-28 19:54:35 +0000615 # reorder tasks by priority
616 task_priority_order = [models.SpecialTask.Task.REPAIR,
617 models.SpecialTask.Task.CLEANUP,
Dan Shi07e09af2013-04-12 09:31:29 -0700618 models.SpecialTask.Task.VERIFY,
619 models.SpecialTask.Task.RESET]
showard65db3932009-10-28 19:54:35 +0000620 def task_priority_key(task):
621 return task_priority_order.index(task.task)
622 return sorted(queued_tasks, key=task_priority_key)
623
624
showard65db3932009-10-28 19:54:35 +0000625 def _schedule_special_tasks(self):
626 """
627 Execute queued SpecialTasks that are ready to run on idle hosts.
628 """
629 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000630 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000631 continue
showardd1195652009-12-08 22:21:02 +0000632 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000633
634
showard170873e2009-01-07 00:22:26 +0000635 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000636 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000637 # should never happen
showarded2afea2009-07-07 20:54:07 +0000638 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000639 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000640 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000641 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000642 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000643
644
jadmanski0afbb632008-06-06 21:10:57 +0000645 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000646 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700647 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000648 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000649 if self.host_has_agent(host):
650 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000651 continue
showard8cc058f2009-09-08 16:26:33 +0000652 if self._host_has_scheduled_special_task(host):
653 # host will have a special task scheduled on the next cycle
654 continue
showard170873e2009-01-07 00:22:26 +0000655 if print_message:
showardb18134f2009-03-20 20:52:18 +0000656 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000657 models.SpecialTask.objects.create(
658 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000659 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000660
661
jadmanski0afbb632008-06-06 21:10:57 +0000662 def _recover_hosts(self):
663 # recover "Repair Failed" hosts
664 message = 'Reverifying dead host %s'
665 self._reverify_hosts_where("status = 'Repair Failed'",
666 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000667
668
showard04c82c52008-05-29 19:38:12 +0000669
showardb95b1bd2008-08-15 18:11:04 +0000670 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000671 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +0000672 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000673 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000674 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +0000675 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000676
677
showard89f84db2009-03-12 20:39:13 +0000678 def _refresh_pending_queue_entries(self):
679 """
680 Lookup the pending HostQueueEntries and call our HostScheduler
681 refresh() method given that list. Return the list.
682
683 @returns A list of pending HostQueueEntries sorted in priority order.
684 """
showard63a34772008-08-18 19:32:50 +0000685 queue_entries = self._get_pending_queue_entries()
686 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000687 return []
showardb95b1bd2008-08-15 18:11:04 +0000688
showard63a34772008-08-18 19:32:50 +0000689 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000690
showard89f84db2009-03-12 20:39:13 +0000691 return queue_entries
692
693
694 def _schedule_atomic_group(self, queue_entry):
695 """
696 Schedule the given queue_entry on an atomic group of hosts.
697
698 Returns immediately if there are insufficient available hosts.
699
700 Creates new HostQueueEntries based off of queue_entry for the
701 scheduled hosts and starts them all running.
702 """
703 # This is a virtual host queue entry representing an entire
704 # atomic group, find a group and schedule their hosts.
705 group_hosts = self._host_scheduler.find_eligible_atomic_group(
706 queue_entry)
707 if not group_hosts:
708 return
showardcbe6f942009-06-17 19:33:49 +0000709
710 logging.info('Expanding atomic group entry %s with hosts %s',
711 queue_entry,
712 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000713
showard89f84db2009-03-12 20:39:13 +0000714 for assigned_host in group_hosts[1:]:
715 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000716 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000717 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000718 new_hqe.set_host(assigned_host)
719 self._run_queue_entry(new_hqe)
720
721 # The first assigned host uses the original HostQueueEntry
722 queue_entry.set_host(group_hosts[0])
723 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000724
725
showarda9545c02009-12-18 22:44:26 +0000726 def _schedule_hostless_job(self, queue_entry):
727 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000728 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000729
730
showard89f84db2009-03-12 20:39:13 +0000731 def _schedule_new_jobs(self):
732 queue_entries = self._refresh_pending_queue_entries()
733 if not queue_entries:
734 return
735
Simran Basi3f6717d2012-09-13 15:21:22 -0700736 logging.debug('Processing %d queue_entries', len(queue_entries))
showard63a34772008-08-18 19:32:50 +0000737 for queue_entry in queue_entries:
Simran Basidef92872012-09-20 13:34:34 -0700738 self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
showarde55955f2009-10-07 20:48:58 +0000739 is_unassigned_atomic_group = (
740 queue_entry.atomic_group_id is not None
741 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +0000742
743 if queue_entry.is_hostless():
Simran Basidef92872012-09-20 13:34:34 -0700744 self._log_extra_msg('Scheduling hostless job.')
showarda9545c02009-12-18 22:44:26 +0000745 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000746 elif is_unassigned_atomic_group:
747 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +0000748 else:
jamesren883492a2010-02-12 00:45:18 +0000749 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +0000750 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +0000751 assert assigned_host.id == queue_entry.host_id
752 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000753
754
showard8cc058f2009-09-08 16:26:33 +0000755 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +0000756 for agent_task in self._get_queue_entry_agent_tasks():
757 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000758
759
760 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000761 for entry in scheduler_models.HostQueueEntry.fetch(
762 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000763 task = entry.job.schedule_delayed_callback_task(entry)
764 if task:
showardd1195652009-12-08 22:21:02 +0000765 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000766
767
jamesren883492a2010-02-12 00:45:18 +0000768 def _run_queue_entry(self, queue_entry):
Simran Basidef92872012-09-20 13:34:34 -0700769 self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
770 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000771 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000772
773
jadmanski0afbb632008-06-06 21:10:57 +0000774 def _find_aborting(self):
jamesrene7c65cb2010-06-08 20:38:10 +0000775 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000776 for entry in scheduler_models.HostQueueEntry.fetch(
777 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +0000778 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +0000779 for agent in self.get_agents_for_entry(entry):
780 agent.abort()
781 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000782 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700783 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000784 for job in jobs_to_stop:
785 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000786
787
showard324bf812009-01-20 23:23:38 +0000788 def _can_start_agent(self, agent, num_started_this_cycle,
789 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000790 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000791 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000792 return True
793 # don't allow any nonzero-process agents to run after we've reached a
794 # limit (this avoids starvation of many-process agents)
795 if have_reached_limit:
796 return False
797 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000798 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000799 agent.task.owner_username,
800 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000801 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000802 return False
803 # if a single agent exceeds the per-cycle throttling, still allow it to
804 # run when it's the first agent in the cycle
805 if num_started_this_cycle == 0:
806 return True
807 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000808 if (num_started_this_cycle + agent.task.num_processes >
809 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000810 return False
811 return True
812
813
jadmanski0afbb632008-06-06 21:10:57 +0000814 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000815 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000816 have_reached_limit = False
817 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700818 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000819 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700820 self._log_extra_msg('Processing Agent with Host Ids: %s and '
821 'queue_entry ids:%s' % (agent.host_ids,
822 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000823 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000824 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000825 have_reached_limit):
826 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700827 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000828 continue
showardd1195652009-12-08 22:21:02 +0000829 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700830 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000831 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700832 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000833 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700834 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000835 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700836 logging.info('%d running processes. %d added this cycle.',
837 _drone_manager.total_running_processes(),
838 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +0000839
840
showard29f7cd22009-04-29 21:16:24 +0000841 def _process_recurring_runs(self):
842 recurring_runs = models.RecurringRun.objects.filter(
843 start_date__lte=datetime.datetime.now())
844 for rrun in recurring_runs:
845 # Create job from template
846 job = rrun.job
847 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000848 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000849
850 host_objects = info['hosts']
851 one_time_hosts = info['one_time_hosts']
852 metahost_objects = info['meta_hosts']
853 dependencies = info['dependencies']
854 atomic_group = info['atomic_group']
855
856 for host in one_time_hosts or []:
857 this_host = models.Host.create_one_time_host(host.hostname)
858 host_objects.append(this_host)
859
860 try:
861 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000862 options=options,
showard29f7cd22009-04-29 21:16:24 +0000863 host_objects=host_objects,
864 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +0000865 atomic_group=atomic_group)
866
867 except Exception, ex:
868 logging.exception(ex)
869 #TODO send email
870
871 if rrun.loop_count == 1:
872 rrun.delete()
873 else:
874 if rrun.loop_count != 0: # if not infinite loop
875 # calculate new start_date
876 difference = datetime.timedelta(seconds=rrun.loop_period)
877 rrun.start_date = rrun.start_date + difference
878 rrun.loop_count -= 1
879 rrun.save()
880
881
Simran Basia858a232012-08-21 11:04:37 -0700882SiteDispatcher = utils.import_site_class(
883 __file__, 'autotest_lib.scheduler.site_monitor_db',
884 'SiteDispatcher', BaseDispatcher)
885
886class Dispatcher(SiteDispatcher):
887 pass
888
889
showard170873e2009-01-07 00:22:26 +0000890class PidfileRunMonitor(object):
891 """
892 Client must call either run() to start a new process or
893 attach_to_existing_process().
894 """
mbligh36768f02008-02-22 18:28:33 +0000895
showard170873e2009-01-07 00:22:26 +0000896 class _PidfileException(Exception):
897 """
898 Raised when there's some unexpected behavior with the pid file, but only
899 used internally (never allowed to escape this class).
900 """
mbligh36768f02008-02-22 18:28:33 +0000901
902
showard170873e2009-01-07 00:22:26 +0000903 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000904 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000905 self._start_time = None
906 self.pidfile_id = None
907 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000908
909
showard170873e2009-01-07 00:22:26 +0000910 def _add_nice_command(self, command, nice_level):
911 if not nice_level:
912 return command
913 return ['nice', '-n', str(nice_level)] + command
914
915
916 def _set_start_time(self):
917 self._start_time = time.time()
918
919
showard418785b2009-11-23 20:19:59 +0000920 def run(self, command, working_directory, num_processes, nice_level=None,
921 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +0000922 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +0000923 assert command is not None
924 if nice_level is not None:
925 command = ['nice', '-n', str(nice_level)] + command
926 self._set_start_time()
927 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +0000928 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +0000929 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +0000930 paired_with_pidfile=paired_with_pidfile, username=username,
931 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +0000932
933
showarded2afea2009-07-07 20:54:07 +0000934 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +0000935 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +0000936 num_processes=None):
showard170873e2009-01-07 00:22:26 +0000937 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +0000938 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000939 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +0000940 if num_processes is not None:
941 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +0000942
943
jadmanski0afbb632008-06-06 21:10:57 +0000944 def kill(self):
showard170873e2009-01-07 00:22:26 +0000945 if self.has_process():
946 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000947
mbligh36768f02008-02-22 18:28:33 +0000948
showard170873e2009-01-07 00:22:26 +0000949 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000950 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000951 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000952
953
showard170873e2009-01-07 00:22:26 +0000954 def get_process(self):
showard21baa452008-10-21 00:08:39 +0000955 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +0000956 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +0000957 return self._state.process
mblighbb421852008-03-11 22:36:16 +0000958
959
showard170873e2009-01-07 00:22:26 +0000960 def _read_pidfile(self, use_second_read=False):
961 assert self.pidfile_id is not None, (
962 'You must call run() or attach_to_existing_process()')
963 contents = _drone_manager.get_pidfile_contents(
964 self.pidfile_id, use_second_read=use_second_read)
965 if contents.is_invalid():
966 self._state = drone_manager.PidfileContents()
967 raise self._PidfileException(contents)
968 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +0000969
970
showard21baa452008-10-21 00:08:39 +0000971 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +0000972 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
973 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +0000974 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +0000975 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +0000976
977
978 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +0000979 if self.lost_process:
showard21baa452008-10-21 00:08:39 +0000980 return
mblighbb421852008-03-11 22:36:16 +0000981
showard21baa452008-10-21 00:08:39 +0000982 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +0000983
showard170873e2009-01-07 00:22:26 +0000984 if self._state.process is None:
985 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +0000986 return
mbligh90a549d2008-03-25 23:52:34 +0000987
showard21baa452008-10-21 00:08:39 +0000988 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000989 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +0000990 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +0000991 return
mbligh90a549d2008-03-25 23:52:34 +0000992
showard170873e2009-01-07 00:22:26 +0000993 # pid but no running process - maybe process *just* exited
994 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +0000995 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000996 # autoserv exited without writing an exit code
997 # to the pidfile
showard21baa452008-10-21 00:08:39 +0000998 self._handle_pidfile_error(
999 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001000
showard21baa452008-10-21 00:08:39 +00001001
1002 def _get_pidfile_info(self):
1003 """\
1004 After completion, self._state will contain:
1005 pid=None, exit_status=None if autoserv has not yet run
1006 pid!=None, exit_status=None if autoserv is running
1007 pid!=None, exit_status!=None if autoserv has completed
1008 """
1009 try:
1010 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001011 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001012 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001013
1014
showard170873e2009-01-07 00:22:26 +00001015 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001016 """\
1017 Called when no pidfile is found or no pid is in the pidfile.
1018 """
showard170873e2009-01-07 00:22:26 +00001019 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001020 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001021 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001022 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001023 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001024
1025
showard35162b02009-03-03 02:17:30 +00001026 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001027 """\
1028 Called when autoserv has exited without writing an exit status,
1029 or we've timed out waiting for autoserv to write a pid to the
1030 pidfile. In either case, we just return failure and the caller
1031 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001032
showard170873e2009-01-07 00:22:26 +00001033 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001034 """
1035 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001036 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001037 self._state.exit_status = 1
1038 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001039
1040
jadmanski0afbb632008-06-06 21:10:57 +00001041 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001042 self._get_pidfile_info()
1043 return self._state.exit_status
1044
1045
1046 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001047 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001048 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001049 if self._state.num_tests_failed is None:
1050 return -1
showard21baa452008-10-21 00:08:39 +00001051 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001052
1053
showardcdaeae82009-08-31 18:32:48 +00001054 def try_copy_results_on_drone(self, **kwargs):
1055 if self.has_process():
1056 # copy results logs into the normal place for job results
1057 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1058
1059
1060 def try_copy_to_results_repository(self, source, **kwargs):
1061 if self.has_process():
1062 _drone_manager.copy_to_results_repository(self.get_process(),
1063 source, **kwargs)
1064
1065
mbligh36768f02008-02-22 18:28:33 +00001066class Agent(object):
showard77182562009-06-10 00:16:05 +00001067 """
showard8cc058f2009-09-08 16:26:33 +00001068 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001069
1070 The following methods are required on all task objects:
1071 poll() - Called periodically to let the task check its status and
1072 update its internal state. If the task succeeded.
1073 is_done() - Returns True if the task is finished.
1074 abort() - Called when an abort has been requested. The task must
1075 set its aborted attribute to True if it actually aborted.
1076
1077 The following attributes are required on all task objects:
1078 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001079 success - bool, True if this task succeeded.
1080 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1081 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001082 """
1083
1084
showard418785b2009-11-23 20:19:59 +00001085 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001086 """
showard8cc058f2009-09-08 16:26:33 +00001087 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001088 """
showard8cc058f2009-09-08 16:26:33 +00001089 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001090
showard77182562009-06-10 00:16:05 +00001091 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001092 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001093
showard8cc058f2009-09-08 16:26:33 +00001094 self.queue_entry_ids = task.queue_entry_ids
1095 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001096
showard8cc058f2009-09-08 16:26:33 +00001097 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001098 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001099
1100
jadmanski0afbb632008-06-06 21:10:57 +00001101 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001102 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001103 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001104 self.task.poll()
1105 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001106 self.finished = True
showardec113162008-05-08 00:52:49 +00001107
1108
jadmanski0afbb632008-06-06 21:10:57 +00001109 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001110 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001111
1112
showardd3dc1992009-04-22 21:01:40 +00001113 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001114 if self.task:
1115 self.task.abort()
1116 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001117 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001118 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001119
showardd3dc1992009-04-22 21:01:40 +00001120
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001121class BaseAgentTask(object):
showardd1195652009-12-08 22:21:02 +00001122 class _NullMonitor(object):
1123 pidfile_id = None
1124
1125 def has_process(self):
1126 return True
1127
1128
1129 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001130 """
showardd1195652009-12-08 22:21:02 +00001131 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001132 """
jadmanski0afbb632008-06-06 21:10:57 +00001133 self.done = False
showardd1195652009-12-08 22:21:02 +00001134 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001135 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001136 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001137 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001138 self.queue_entry_ids = []
1139 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001140 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001141
1142
1143 def _set_ids(self, host=None, queue_entries=None):
1144 if queue_entries and queue_entries != [None]:
1145 self.host_ids = [entry.host.id for entry in queue_entries]
1146 self.queue_entry_ids = [entry.id for entry in queue_entries]
1147 else:
1148 assert host
1149 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001150
1151
jadmanski0afbb632008-06-06 21:10:57 +00001152 def poll(self):
showard08a36412009-05-05 01:01:13 +00001153 if not self.started:
1154 self.start()
showardd1195652009-12-08 22:21:02 +00001155 if not self.done:
1156 self.tick()
showard08a36412009-05-05 01:01:13 +00001157
1158
1159 def tick(self):
showardd1195652009-12-08 22:21:02 +00001160 assert self.monitor
1161 exit_code = self.monitor.exit_code()
1162 if exit_code is None:
1163 return
mbligh36768f02008-02-22 18:28:33 +00001164
showardd1195652009-12-08 22:21:02 +00001165 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001166 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001167
1168
jadmanski0afbb632008-06-06 21:10:57 +00001169 def is_done(self):
1170 return self.done
mbligh36768f02008-02-22 18:28:33 +00001171
1172
jadmanski0afbb632008-06-06 21:10:57 +00001173 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001174 if self.done:
showardd1195652009-12-08 22:21:02 +00001175 assert self.started
showard08a36412009-05-05 01:01:13 +00001176 return
showardd1195652009-12-08 22:21:02 +00001177 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001178 self.done = True
1179 self.success = success
1180 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001181
1182
jadmanski0afbb632008-06-06 21:10:57 +00001183 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001184 """
1185 To be overridden.
1186 """
showarded2afea2009-07-07 20:54:07 +00001187 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001188 self.register_necessary_pidfiles()
1189
1190
1191 def _log_file(self):
1192 if not self._log_file_name:
1193 return None
1194 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001195
mbligh36768f02008-02-22 18:28:33 +00001196
jadmanski0afbb632008-06-06 21:10:57 +00001197 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001198 log_file = self._log_file()
1199 if self.monitor and log_file:
1200 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001201
1202
jadmanski0afbb632008-06-06 21:10:57 +00001203 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001204 """
1205 To be overridden.
1206 """
jadmanski0afbb632008-06-06 21:10:57 +00001207 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001208 logging.info("%s finished with success=%s", type(self).__name__,
1209 self.success)
1210
mbligh36768f02008-02-22 18:28:33 +00001211
1212
jadmanski0afbb632008-06-06 21:10:57 +00001213 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001214 if not self.started:
1215 self.prolog()
1216 self.run()
1217
1218 self.started = True
1219
1220
1221 def abort(self):
1222 if self.monitor:
1223 self.monitor.kill()
1224 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001225 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001226 self.cleanup()
1227
1228
showarded2afea2009-07-07 20:54:07 +00001229 def _get_consistent_execution_path(self, execution_entries):
1230 first_execution_path = execution_entries[0].execution_path()
1231 for execution_entry in execution_entries[1:]:
1232 assert execution_entry.execution_path() == first_execution_path, (
1233 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1234 execution_entry,
1235 first_execution_path,
1236 execution_entries[0]))
1237 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001238
1239
showarded2afea2009-07-07 20:54:07 +00001240 def _copy_results(self, execution_entries, use_monitor=None):
1241 """
1242 @param execution_entries: list of objects with execution_path() method
1243 """
showard6d1c1432009-08-20 23:30:39 +00001244 if use_monitor is not None and not use_monitor.has_process():
1245 return
1246
showarded2afea2009-07-07 20:54:07 +00001247 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001248 if use_monitor is None:
1249 assert self.monitor
1250 use_monitor = self.monitor
1251 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001252 execution_path = self._get_consistent_execution_path(execution_entries)
1253 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001254 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001255
showarda1e74b32009-05-12 17:32:04 +00001256
1257 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001258 for queue_entry in queue_entries:
1259 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001260
1261
mbligh4608b002010-01-05 18:22:35 +00001262 def _archive_results(self, queue_entries):
1263 for queue_entry in queue_entries:
1264 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001265
1266
showardd1195652009-12-08 22:21:02 +00001267 def _command_line(self):
1268 """
1269 Return the command line to run. Must be overridden.
1270 """
1271 raise NotImplementedError
1272
1273
1274 @property
1275 def num_processes(self):
1276 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001277 Return the number of processes forked by this BaseAgentTask's process.
1278 It may only be approximate. To be overridden if necessary.
showardd1195652009-12-08 22:21:02 +00001279 """
1280 return 1
1281
1282
1283 def _paired_with_monitor(self):
1284 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001285 If this BaseAgentTask's process must run on the same machine as some
showardd1195652009-12-08 22:21:02 +00001286 previous process, this method should be overridden to return a
1287 PidfileRunMonitor for that process.
1288 """
1289 return self._NullMonitor()
1290
1291
1292 @property
1293 def owner_username(self):
1294 """
1295 Return login of user responsible for this task. May be None. Must be
1296 overridden.
1297 """
1298 raise NotImplementedError
1299
1300
1301 def _working_directory(self):
1302 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001303 Return the directory where this BaseAgentTask's process executes.
1304 Must be overridden.
showardd1195652009-12-08 22:21:02 +00001305 """
1306 raise NotImplementedError
1307
1308
1309 def _pidfile_name(self):
1310 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001311 Return the name of the pidfile this BaseAgentTask's process uses. To be
showardd1195652009-12-08 22:21:02 +00001312 overridden if necessary.
1313 """
jamesrenc44ae992010-02-19 00:12:54 +00001314 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001315
1316
1317 def _check_paired_results_exist(self):
1318 if not self._paired_with_monitor().has_process():
1319 email_manager.manager.enqueue_notify_email(
1320 'No paired results in task',
1321 'No paired results in task %s at %s'
1322 % (self, self._paired_with_monitor().pidfile_id))
1323 self.finished(False)
1324 return False
1325 return True
1326
1327
1328 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001329 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001330 self.monitor = PidfileRunMonitor()
1331
1332
1333 def run(self):
1334 if not self._check_paired_results_exist():
1335 return
1336
1337 self._create_monitor()
1338 self.monitor.run(
1339 self._command_line(), self._working_directory(),
1340 num_processes=self.num_processes,
1341 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1342 pidfile_name=self._pidfile_name(),
1343 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001344 username=self.owner_username,
1345 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1346
1347
1348 def get_drone_hostnames_allowed(self):
1349 if not models.DroneSet.drone_sets_enabled():
1350 return None
1351
1352 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1353 if not hqes:
1354 # Only special tasks could be missing host queue entries
1355 assert isinstance(self, SpecialAgentTask)
1356 return self._user_or_global_default_drone_set(
1357 self.task, self.task.requested_by)
1358
1359 job_ids = hqes.values_list('job', flat=True).distinct()
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001360 assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
jamesren76fcf192010-04-21 20:39:50 +00001361 "span multiple jobs")
1362
1363 job = models.Job.objects.get(id=job_ids[0])
1364 drone_set = job.drone_set
1365 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001366 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001367
1368 return drone_set.get_drone_hostnames()
1369
1370
1371 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1372 """
1373 Returns the user's default drone set, if present.
1374
1375 Otherwise, returns the global default drone set.
1376 """
1377 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1378 if not user:
1379 logging.warn('%s had no owner; using default drone set',
1380 obj_with_owner)
1381 return default_hostnames
1382 if not user.drone_set:
1383 logging.warn('User %s has no default drone set, using global '
1384 'default', user.login)
1385 return default_hostnames
1386 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001387
1388
1389 def register_necessary_pidfiles(self):
1390 pidfile_id = _drone_manager.get_pidfile_id_from(
1391 self._working_directory(), self._pidfile_name())
1392 _drone_manager.register_pidfile(pidfile_id)
1393
1394 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1395 if paired_pidfile_id:
1396 _drone_manager.register_pidfile(paired_pidfile_id)
1397
1398
1399 def recover(self):
1400 if not self._check_paired_results_exist():
1401 return
1402
1403 self._create_monitor()
1404 self.monitor.attach_to_existing_process(
1405 self._working_directory(), pidfile_name=self._pidfile_name(),
1406 num_processes=self.num_processes)
1407 if not self.monitor.has_process():
1408 # no process to recover; wait to be started normally
1409 self.monitor = None
1410 return
1411
1412 self.started = True
Aviv Keshet225bdfe2013-03-05 10:10:08 -08001413 logging.info('Recovering process %s for %s at %s',
1414 self.monitor.get_process(), type(self).__name__,
1415 self._working_directory())
mbligh36768f02008-02-22 18:28:33 +00001416
1417
mbligh4608b002010-01-05 18:22:35 +00001418 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1419 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001420 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001421 for entry in queue_entries:
1422 if entry.status not in allowed_hqe_statuses:
Dale Curtisaa513362011-03-01 17:27:44 -08001423 raise host_scheduler.SchedulerError(
1424 '%s attempting to start entry with invalid status %s: '
1425 '%s' % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001426 invalid_host_status = (
1427 allowed_host_statuses is not None
1428 and entry.host.status not in allowed_host_statuses)
1429 if invalid_host_status:
Dale Curtisaa513362011-03-01 17:27:44 -08001430 raise host_scheduler.SchedulerError(
1431 '%s attempting to start on queue entry with invalid '
1432 'host status %s: %s'
1433 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001434
1435
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001436SiteAgentTask = utils.import_site_class(
1437 __file__, 'autotest_lib.scheduler.site_monitor_db',
1438 'SiteAgentTask', BaseAgentTask)
1439
1440class AgentTask(SiteAgentTask):
1441 pass
1442
1443
showardd9205182009-04-27 20:09:55 +00001444class TaskWithJobKeyvals(object):
1445 """AgentTask mixin providing functionality to help with job keyval files."""
1446 _KEYVAL_FILE = 'keyval'
1447 def _format_keyval(self, key, value):
1448 return '%s=%s' % (key, value)
1449
1450
1451 def _keyval_path(self):
1452 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001453 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001454
1455
1456 def _write_keyval_after_job(self, field, value):
1457 assert self.monitor
1458 if not self.monitor.has_process():
1459 return
1460 _drone_manager.write_lines_to_file(
1461 self._keyval_path(), [self._format_keyval(field, value)],
1462 paired_with_process=self.monitor.get_process())
1463
1464
1465 def _job_queued_keyval(self, job):
1466 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1467
1468
1469 def _write_job_finished(self):
1470 self._write_keyval_after_job("job_finished", int(time.time()))
1471
1472
showarddb502762009-09-09 15:31:20 +00001473 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1474 keyval_contents = '\n'.join(self._format_keyval(key, value)
1475 for key, value in keyval_dict.iteritems())
1476 # always end with a newline to allow additional keyvals to be written
1477 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001478 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001479 keyval_contents,
1480 file_path=keyval_path)
1481
1482
1483 def _write_keyvals_before_job(self, keyval_dict):
1484 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1485
1486
1487 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001488 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001489 host.hostname)
1490 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001491 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001492 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1493 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1494
1495
showard8cc058f2009-09-08 16:26:33 +00001496class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001497 """
1498 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1499 """
1500
1501 TASK_TYPE = None
1502 host = None
1503 queue_entry = None
1504
showardd1195652009-12-08 22:21:02 +00001505 def __init__(self, task, extra_command_args):
1506 super(SpecialAgentTask, self).__init__()
1507
lmrb7c5d272010-04-16 06:34:04 +00001508 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001509
jamesrenc44ae992010-02-19 00:12:54 +00001510 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001511 self.queue_entry = None
1512 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001513 self.queue_entry = scheduler_models.HostQueueEntry(
1514 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001515
showarded2afea2009-07-07 20:54:07 +00001516 self.task = task
1517 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001518
1519
showard8cc058f2009-09-08 16:26:33 +00001520 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001521 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1522
1523
1524 def _command_line(self):
1525 return _autoserv_command_line(self.host.hostname,
1526 self._extra_command_args,
1527 queue_entry=self.queue_entry)
1528
1529
1530 def _working_directory(self):
1531 return self.task.execution_path()
1532
1533
1534 @property
1535 def owner_username(self):
1536 if self.task.requested_by:
1537 return self.task.requested_by.login
1538 return None
showard8cc058f2009-09-08 16:26:33 +00001539
1540
showarded2afea2009-07-07 20:54:07 +00001541 def prolog(self):
1542 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001543 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001544 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001545
1546
showardde634ee2009-01-30 01:44:24 +00001547 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001548 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001549
showard2fe3f1d2009-07-06 20:19:11 +00001550 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001551 return # don't fail metahost entries, they'll be reassigned
1552
showard2fe3f1d2009-07-06 20:19:11 +00001553 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001554 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001555 return # entry has been aborted
1556
showard2fe3f1d2009-07-06 20:19:11 +00001557 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001558 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001559 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001560 self._write_keyval_after_job(queued_key, queued_time)
1561 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001562
showard8cc058f2009-09-08 16:26:33 +00001563 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001564 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001565 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001566 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001567
showard8cc058f2009-09-08 16:26:33 +00001568 pidfile_id = _drone_manager.get_pidfile_id_from(
1569 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001570 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001571 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001572
1573 if self.queue_entry.job.parse_failed_repair:
1574 self._parse_results([self.queue_entry])
1575 else:
1576 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001577
Alex Miller23676a22013-07-03 09:03:36 -07001578 # Also fail all other special tasks that have not yet run for this HQE
1579 pending_tasks = models.SpecialTask.objects.filter(
1580 queue_entry__id=self.queue_entry.id,
1581 is_complete=0)
1582 if pending_tasks:
1583 for task in pending_tasks:
1584 task.finish(False)
1585
showard8cc058f2009-09-08 16:26:33 +00001586
1587 def cleanup(self):
1588 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001589
1590 # We will consider an aborted task to be "Failed"
1591 self.task.finish(bool(self.success))
1592
showardf85a0b72009-10-07 20:48:45 +00001593 if self.monitor:
1594 if self.monitor.has_process():
1595 self._copy_results([self.task])
1596 if self.monitor.pidfile_id is not None:
1597 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001598
1599
Dan Shi07e09af2013-04-12 09:31:29 -07001600 def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
1601 """Remove a type of special task in all tasks, keep last one if needed.
1602
1603 @param special_task_to_remove: type of special task to be removed, e.g.,
1604 models.SpecialTask.Task.VERIFY.
1605 @param keep_last_one: True to keep the last special task if its type is
1606 the same as of special_task_to_remove.
1607
1608 """
1609 queued_special_tasks = models.SpecialTask.objects.filter(
1610 host__id=self.host.id,
1611 task=special_task_to_remove,
1612 is_active=False, is_complete=False, queue_entry=None)
1613 if keep_last_one:
1614 queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
1615 queued_special_tasks.delete()
1616
1617
showard8cc058f2009-09-08 16:26:33 +00001618class RepairTask(SpecialAgentTask):
1619 TASK_TYPE = models.SpecialTask.Task.REPAIR
1620
1621
showardd1195652009-12-08 22:21:02 +00001622 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001623 """\
1624 queue_entry: queue entry to mark failed if this repair fails.
1625 """
1626 protection = host_protections.Protection.get_string(
1627 task.host.protection)
1628 # normalize the protection name
1629 protection = host_protections.Protection.get_attr_name(protection)
1630
1631 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001632 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001633
1634 # *don't* include the queue entry in IDs -- if the queue entry is
1635 # aborted, we want to leave the repair task running
1636 self._set_ids(host=self.host)
1637
1638
1639 def prolog(self):
1640 super(RepairTask, self).prolog()
1641 logging.info("repair_task starting")
1642 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001643
1644
jadmanski0afbb632008-06-06 21:10:57 +00001645 def epilog(self):
1646 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001647
jadmanski0afbb632008-06-06 21:10:57 +00001648 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001649 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001650 else:
showard8cc058f2009-09-08 16:26:33 +00001651 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001652 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001653 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001654
1655
showarded2afea2009-07-07 20:54:07 +00001656class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001657 def _copy_to_results_repository(self):
1658 if not self.queue_entry or self.queue_entry.meta_host:
1659 return
1660
1661 self.queue_entry.set_execution_subdir()
1662 log_name = os.path.basename(self.task.execution_path())
1663 source = os.path.join(self.task.execution_path(), 'debug',
1664 'autoserv.DEBUG')
1665 destination = os.path.join(
1666 self.queue_entry.execution_path(), log_name)
1667
1668 self.monitor.try_copy_to_results_repository(
1669 source, destination_path=destination)
1670
1671
showard170873e2009-01-07 00:22:26 +00001672 def epilog(self):
1673 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001674
showard775300b2009-09-09 15:30:50 +00001675 if self.success:
1676 return
showard8fe93b52008-11-18 17:53:22 +00001677
showard775300b2009-09-09 15:30:50 +00001678 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001679
showard775300b2009-09-09 15:30:50 +00001680 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001681 # effectively ignore failure for these hosts
1682 self.success = True
showard775300b2009-09-09 15:30:50 +00001683 return
1684
1685 if self.queue_entry:
1686 self.queue_entry.requeue()
1687
1688 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001689 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001690 queue_entry__id=self.queue_entry.id):
1691 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1692 self._fail_queue_entry()
1693 return
1694
showard9bb960b2009-11-19 01:02:11 +00001695 queue_entry = models.HostQueueEntry.objects.get(
1696 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001697 else:
1698 queue_entry = None
1699
1700 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001701 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001702 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001703 queue_entry=queue_entry,
1704 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001705
showard8fe93b52008-11-18 17:53:22 +00001706
Alex Miller42437f92013-05-28 12:58:54 -07001707 def _should_pending(self):
1708 """
1709 Decide if we should call the host queue entry's on_pending method.
1710 We should if:
1711 1) There exists an associated host queue entry.
1712 2) The current special task completed successfully.
1713 3) There do not exist any more special tasks to be run before the
1714 host queue entry starts.
1715
1716 @returns: True if we should call pending, false if not.
1717
1718 """
1719 if not self.queue_entry or not self.success:
1720 return False
1721
1722 # We know if this is the last one when we create it, so we could add
1723 # another column to the database to keep track of this information, but
1724 # I expect the overhead of querying here to be minimal.
1725 queue_entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1726 queued = models.SpecialTask.objects.filter(
1727 host__id=self.host.id, is_active=False,
1728 is_complete=False, queue_entry=queue_entry)
1729 queued = queued.exclude(id=self.task.id)
1730 return queued.count() == 0
1731
1732
showard8fe93b52008-11-18 17:53:22 +00001733class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001734 TASK_TYPE = models.SpecialTask.Task.VERIFY
1735
1736
showardd1195652009-12-08 22:21:02 +00001737 def __init__(self, task):
1738 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001739 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001740
1741
jadmanski0afbb632008-06-06 21:10:57 +00001742 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001743 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001744
showardb18134f2009-03-20 20:52:18 +00001745 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001746 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001747 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1748 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001749
jamesren42318f72010-05-10 23:40:59 +00001750 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00001751 # and there's no need to keep records of other requests.
Dan Shi07e09af2013-04-12 09:31:29 -07001752 self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
1753 keep_last_one=True)
showard2fe3f1d2009-07-06 20:19:11 +00001754
mbligh36768f02008-02-22 18:28:33 +00001755
jadmanski0afbb632008-06-06 21:10:57 +00001756 def epilog(self):
1757 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001758 if self.success:
Alex Miller42437f92013-05-28 12:58:54 -07001759 if self._should_pending():
showard8cc058f2009-09-08 16:26:33 +00001760 self.queue_entry.on_pending()
1761 else:
1762 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001763
1764
mbligh4608b002010-01-05 18:22:35 +00001765class CleanupTask(PreJobTask):
1766 # note this can also run post-job, but when it does, it's running standalone
1767 # against the host (not related to the job), so it's not considered a
1768 # PostJobTask
1769
1770 TASK_TYPE = models.SpecialTask.Task.CLEANUP
1771
1772
1773 def __init__(self, task, recover_run_monitor=None):
1774 super(CleanupTask, self).__init__(task, ['--cleanup'])
1775 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1776
1777
1778 def prolog(self):
1779 super(CleanupTask, self).prolog()
1780 logging.info("starting cleanup task for host: %s", self.host.hostname)
1781 self.host.set_status(models.Host.Status.CLEANING)
1782 if self.queue_entry:
Dan Shi07e09af2013-04-12 09:31:29 -07001783 self.queue_entry.set_status(models.HostQueueEntry.Status.CLEANING)
mbligh4608b002010-01-05 18:22:35 +00001784
1785
1786 def _finish_epilog(self):
1787 if not self.queue_entry or not self.success:
1788 return
1789
1790 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
1791 should_run_verify = (
1792 self.queue_entry.job.run_verify
1793 and self.host.protection != do_not_verify_protection)
1794 if should_run_verify:
1795 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1796 models.SpecialTask.objects.create(
1797 host=models.Host.objects.get(id=self.host.id),
1798 queue_entry=entry,
1799 task=models.SpecialTask.Task.VERIFY)
1800 else:
Alex Miller42437f92013-05-28 12:58:54 -07001801 if self._should_pending():
1802 self.queue_entry.on_pending()
mbligh4608b002010-01-05 18:22:35 +00001803
1804
1805 def epilog(self):
1806 super(CleanupTask, self).epilog()
1807
1808 if self.success:
1809 self.host.update_field('dirty', 0)
1810 self.host.set_status(models.Host.Status.READY)
1811
1812 self._finish_epilog()
1813
1814
Dan Shi07e09af2013-04-12 09:31:29 -07001815class ResetTask(PreJobTask):
1816 """Task to reset a DUT, including cleanup and verify."""
1817 # note this can also run post-job, but when it does, it's running standalone
1818 # against the host (not related to the job), so it's not considered a
1819 # PostJobTask
1820
1821 TASK_TYPE = models.SpecialTask.Task.RESET
1822
1823
1824 def __init__(self, task, recover_run_monitor=None):
1825 super(ResetTask, self).__init__(task, ['--reset'])
1826 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1827
1828
1829 def prolog(self):
1830 super(ResetTask, self).prolog()
1831 logging.info('starting reset task for host: %s',
1832 self.host.hostname)
1833 self.host.set_status(models.Host.Status.RESETTING)
1834 if self.queue_entry:
1835 self.queue_entry.set_status(models.HostQueueEntry.Status.RESETTING)
1836
1837 # Delete any queued cleanups for this host.
1838 self.remove_special_tasks(models.SpecialTask.Task.CLEANUP,
1839 keep_last_one=False)
1840
1841 # Delete any queued reverifies for this host.
1842 self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
1843 keep_last_one=False)
1844
1845 # Only one reset is needed.
1846 self.remove_special_tasks(models.SpecialTask.Task.RESET,
1847 keep_last_one=True)
1848
1849
1850 def epilog(self):
1851 super(ResetTask, self).epilog()
1852
1853 if self.success:
1854 self.host.update_field('dirty', 0)
1855 self.host.set_status(models.Host.Status.READY)
1856
Alex Millerba076c52013-07-11 10:11:48 -07001857 if self._should_pending():
Dan Shi07e09af2013-04-12 09:31:29 -07001858 self.queue_entry.on_pending()
1859
1860
showarda9545c02009-12-18 22:44:26 +00001861class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
1862 """
1863 Common functionality for QueueTask and HostlessQueueTask
1864 """
1865 def __init__(self, queue_entries):
1866 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001867 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001868 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001869
1870
showard73ec0442009-02-07 02:05:20 +00001871 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001872 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001873
1874
jamesrenc44ae992010-02-19 00:12:54 +00001875 def _write_control_file(self, execution_path):
1876 control_path = _drone_manager.attach_file_to_execution(
1877 execution_path, self.job.control_file)
1878 return control_path
1879
1880
Aviv Keshet308e7362013-05-21 14:43:16 -07001881 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001882 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001883 execution_path = self.queue_entries[0].execution_path()
1884 control_path = self._write_control_file(execution_path)
1885 hostnames = ','.join(entry.host.hostname
1886 for entry in self.queue_entries
1887 if not entry.is_hostless())
1888
1889 execution_tag = self.queue_entries[0].execution_tag()
1890 params = _autoserv_command_line(
1891 hostnames,
beepscb6f1e22013-06-28 19:14:10 -07001892 ['-P', execution_tag, '-n', '--verify_job_repo_url',
jamesrenc44ae992010-02-19 00:12:54 +00001893 _drone_manager.absolute_path(control_path)],
1894 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001895 if self.job.is_image_update_job():
1896 params += ['--image', self.job.update_image_path]
1897
jamesrenc44ae992010-02-19 00:12:54 +00001898 return params
showardd1195652009-12-08 22:21:02 +00001899
1900
1901 @property
1902 def num_processes(self):
1903 return len(self.queue_entries)
1904
1905
1906 @property
1907 def owner_username(self):
1908 return self.job.owner
1909
1910
1911 def _working_directory(self):
1912 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001913
1914
jadmanski0afbb632008-06-06 21:10:57 +00001915 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001916 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001917 keyval_dict = self.job.keyval_dict()
1918 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001919 group_name = self.queue_entries[0].get_group_name()
1920 if group_name:
1921 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001922 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001923 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001924 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001925 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001926
1927
showard35162b02009-03-03 02:17:30 +00001928 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001929 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001930 _drone_manager.write_lines_to_file(error_file_path,
1931 [_LOST_PROCESS_ERROR])
1932
1933
showardd3dc1992009-04-22 21:01:40 +00001934 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001935 if not self.monitor:
1936 return
1937
showardd9205182009-04-27 20:09:55 +00001938 self._write_job_finished()
1939
showard35162b02009-03-03 02:17:30 +00001940 if self.monitor.lost_process:
1941 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001942
jadmanskif7fa2cc2008-10-01 14:13:23 +00001943
showardcbd74612008-11-19 21:42:02 +00001944 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001945 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001946 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001947 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001948 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001949
1950
jadmanskif7fa2cc2008-10-01 14:13:23 +00001951 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001952 if not self.monitor or not self.monitor.has_process():
1953 return
1954
jadmanskif7fa2cc2008-10-01 14:13:23 +00001955 # build up sets of all the aborted_by and aborted_on values
1956 aborted_by, aborted_on = set(), set()
1957 for queue_entry in self.queue_entries:
1958 if queue_entry.aborted_by:
1959 aborted_by.add(queue_entry.aborted_by)
1960 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1961 aborted_on.add(t)
1962
1963 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001964 # TODO(showard): this conditional is now obsolete, we just need to leave
1965 # it in temporarily for backwards compatibility over upgrades. delete
1966 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001967 assert len(aborted_by) <= 1
1968 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001969 aborted_by_value = aborted_by.pop()
1970 aborted_on_value = max(aborted_on)
1971 else:
1972 aborted_by_value = 'autotest_system'
1973 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001974
showarda0382352009-02-11 23:36:43 +00001975 self._write_keyval_after_job("aborted_by", aborted_by_value)
1976 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001977
showardcbd74612008-11-19 21:42:02 +00001978 aborted_on_string = str(datetime.datetime.fromtimestamp(
1979 aborted_on_value))
1980 self._write_status_comment('Job aborted by %s on %s' %
1981 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001982
1983
jadmanski0afbb632008-06-06 21:10:57 +00001984 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001985 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001986 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001987 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001988
1989
jadmanski0afbb632008-06-06 21:10:57 +00001990 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001991 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001992 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001993
1994
1995class QueueTask(AbstractQueueTask):
1996 def __init__(self, queue_entries):
1997 super(QueueTask, self).__init__(queue_entries)
1998 self._set_ids(queue_entries=queue_entries)
1999
2000
2001 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002002 self._check_queue_entry_statuses(
2003 self.queue_entries,
2004 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2005 models.HostQueueEntry.Status.RUNNING),
2006 allowed_host_statuses=(models.Host.Status.PENDING,
2007 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002008
2009 super(QueueTask, self).prolog()
2010
2011 for queue_entry in self.queue_entries:
2012 self._write_host_keyvals(queue_entry.host)
2013 queue_entry.host.set_status(models.Host.Status.RUNNING)
2014 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00002015
2016
2017 def _finish_task(self):
2018 super(QueueTask, self)._finish_task()
2019
2020 for queue_entry in self.queue_entries:
2021 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00002022 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00002023
2024
mbligh4608b002010-01-05 18:22:35 +00002025class HostlessQueueTask(AbstractQueueTask):
2026 def __init__(self, queue_entry):
2027 super(HostlessQueueTask, self).__init__([queue_entry])
2028 self.queue_entry_ids = [queue_entry.id]
2029
2030
2031 def prolog(self):
2032 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2033 super(HostlessQueueTask, self).prolog()
2034
2035
mbligh4608b002010-01-05 18:22:35 +00002036 def _finish_task(self):
2037 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002038 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002039
2040
showardd3dc1992009-04-22 21:01:40 +00002041class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002042 def __init__(self, queue_entries, log_file_name):
2043 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002044
showardd1195652009-12-08 22:21:02 +00002045 self.queue_entries = queue_entries
2046
showardd3dc1992009-04-22 21:01:40 +00002047 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002048 self._autoserv_monitor.attach_to_existing_process(
2049 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002050
showardd1195652009-12-08 22:21:02 +00002051
2052 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002053 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002054 return 'true'
2055 return self._generate_command(
2056 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002057
2058
2059 def _generate_command(self, results_dir):
2060 raise NotImplementedError('Subclasses must override this')
2061
2062
showardd1195652009-12-08 22:21:02 +00002063 @property
2064 def owner_username(self):
2065 return self.queue_entries[0].job.owner
2066
2067
2068 def _working_directory(self):
2069 return self._get_consistent_execution_path(self.queue_entries)
2070
2071
2072 def _paired_with_monitor(self):
2073 return self._autoserv_monitor
2074
2075
showardd3dc1992009-04-22 21:01:40 +00002076 def _job_was_aborted(self):
2077 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002078 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002079 queue_entry.update_from_database()
2080 if was_aborted is None: # first queue entry
2081 was_aborted = bool(queue_entry.aborted)
2082 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00002083 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
2084 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00002085 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00002086 'Inconsistent abort state',
2087 'Queue entries have inconsistent abort state:\n' +
2088 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00002089 # don't crash here, just assume true
2090 return True
2091 return was_aborted
2092
2093
showardd1195652009-12-08 22:21:02 +00002094 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002095 if self._job_was_aborted():
2096 return models.HostQueueEntry.Status.ABORTED
2097
2098 # we'll use a PidfileRunMonitor to read the autoserv exit status
2099 if self._autoserv_monitor.exit_code() == 0:
2100 return models.HostQueueEntry.Status.COMPLETED
2101 return models.HostQueueEntry.Status.FAILED
2102
2103
showardd3dc1992009-04-22 21:01:40 +00002104 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002105 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002106 queue_entry.set_status(status)
2107
2108
2109 def abort(self):
2110 # override AgentTask.abort() to avoid killing the process and ending
2111 # the task. post-job tasks continue when the job is aborted.
2112 pass
2113
2114
mbligh4608b002010-01-05 18:22:35 +00002115 def _pidfile_label(self):
2116 # '.autoserv_execute' -> 'autoserv'
2117 return self._pidfile_name()[1:-len('_execute')]
2118
2119
showard9bb960b2009-11-19 01:02:11 +00002120class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002121 """
2122 Task responsible for
2123 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2124 * copying logs to the results repository
2125 * spawning CleanupTasks for hosts, if necessary
2126 * spawning a FinalReparseTask for the job
2127 """
showardd1195652009-12-08 22:21:02 +00002128 def __init__(self, queue_entries, recover_run_monitor=None):
2129 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002130 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002131 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002132 self._set_ids(queue_entries=queue_entries)
2133
2134
Aviv Keshet308e7362013-05-21 14:43:16 -07002135 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd3dc1992009-04-22 21:01:40 +00002136 def _generate_command(self, results_dir):
2137 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002138 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002139 return [_autoserv_path , '-p',
2140 '--pidfile-label=%s' % self._pidfile_label(),
2141 '--use-existing-results', '--collect-crashinfo',
2142 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002143
2144
showardd1195652009-12-08 22:21:02 +00002145 @property
2146 def num_processes(self):
2147 return len(self.queue_entries)
2148
2149
2150 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002151 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002152
2153
showardd3dc1992009-04-22 21:01:40 +00002154 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002155 self._check_queue_entry_statuses(
2156 self.queue_entries,
2157 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2158 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002159
showardd3dc1992009-04-22 21:01:40 +00002160 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002161
2162
showardd3dc1992009-04-22 21:01:40 +00002163 def epilog(self):
2164 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002165 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002166 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002167
showard9bb960b2009-11-19 01:02:11 +00002168
2169 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002170 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002171 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002172 models.HostQueueEntry.Status.COMPLETED)
2173 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2174 else:
2175 final_success = False
2176 num_tests_failed = 0
showard9bb960b2009-11-19 01:02:11 +00002177 reboot_after = self._job.reboot_after
2178 do_reboot = (
2179 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002180 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002181 or reboot_after == model_attributes.RebootAfter.ALWAYS
2182 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
Dan Shi07e09af2013-04-12 09:31:29 -07002183 and final_success and num_tests_failed == 0)
2184 or num_tests_failed > 0)
showard9bb960b2009-11-19 01:02:11 +00002185
showardd1195652009-12-08 22:21:02 +00002186 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002187 if do_reboot:
2188 # don't pass the queue entry to the CleanupTask. if the cleanup
2189 # fails, the job doesn't care -- it's over.
2190 models.SpecialTask.objects.create(
2191 host=models.Host.objects.get(id=queue_entry.host.id),
2192 task=models.SpecialTask.Task.CLEANUP,
2193 requested_by=self._job.owner_model())
2194 else:
2195 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002196
2197
showard0bbfc212009-04-29 21:06:13 +00002198 def run(self):
showard597bfd32009-05-08 18:22:50 +00002199 autoserv_exit_code = self._autoserv_monitor.exit_code()
2200 # only run if Autoserv exited due to some signal. if we have no exit
2201 # code, assume something bad (and signal-like) happened.
2202 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002203 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002204 else:
2205 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002206
2207
mbligh4608b002010-01-05 18:22:35 +00002208class SelfThrottledPostJobTask(PostJobTask):
2209 """
2210 Special AgentTask subclass that maintains its own global process limit.
2211 """
2212 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002213
2214
mbligh4608b002010-01-05 18:22:35 +00002215 @classmethod
2216 def _increment_running_processes(cls):
2217 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002218
mblighd5c95802008-03-05 00:33:46 +00002219
mbligh4608b002010-01-05 18:22:35 +00002220 @classmethod
2221 def _decrement_running_processes(cls):
2222 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002223
2224
mbligh4608b002010-01-05 18:22:35 +00002225 @classmethod
2226 def _max_processes(cls):
2227 raise NotImplementedError
2228
2229
2230 @classmethod
2231 def _can_run_new_process(cls):
2232 return cls._num_running_processes < cls._max_processes()
2233
2234
2235 def _process_started(self):
2236 return bool(self.monitor)
2237
2238
2239 def tick(self):
2240 # override tick to keep trying to start until the process count goes
2241 # down and we can, at which point we revert to default behavior
2242 if self._process_started():
2243 super(SelfThrottledPostJobTask, self).tick()
2244 else:
2245 self._try_starting_process()
2246
2247
2248 def run(self):
2249 # override run() to not actually run unless we can
2250 self._try_starting_process()
2251
2252
2253 def _try_starting_process(self):
2254 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002255 return
2256
mbligh4608b002010-01-05 18:22:35 +00002257 # actually run the command
2258 super(SelfThrottledPostJobTask, self).run()
jamesren25663562010-04-27 18:00:55 +00002259 if self._process_started():
2260 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002261
mblighd5c95802008-03-05 00:33:46 +00002262
mbligh4608b002010-01-05 18:22:35 +00002263 def finished(self, success):
2264 super(SelfThrottledPostJobTask, self).finished(success)
2265 if self._process_started():
2266 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002267
showard21baa452008-10-21 00:08:39 +00002268
mbligh4608b002010-01-05 18:22:35 +00002269class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002270 def __init__(self, queue_entries):
2271 super(FinalReparseTask, self).__init__(queue_entries,
2272 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002273 # don't use _set_ids, since we don't want to set the host_ids
2274 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002275
2276
2277 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002278 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002279 results_dir]
2280
2281
2282 @property
2283 def num_processes(self):
2284 return 0 # don't include parser processes in accounting
2285
2286
2287 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002288 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002289
2290
showard97aed502008-11-04 02:01:24 +00002291 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002292 def _max_processes(cls):
2293 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002294
2295
2296 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002297 self._check_queue_entry_statuses(
2298 self.queue_entries,
2299 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002300
showard97aed502008-11-04 02:01:24 +00002301 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002302
2303
2304 def epilog(self):
2305 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002306 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002307
2308
mbligh4608b002010-01-05 18:22:35 +00002309class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002310 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2311
mbligh4608b002010-01-05 18:22:35 +00002312 def __init__(self, queue_entries):
2313 super(ArchiveResultsTask, self).__init__(queue_entries,
2314 log_file_name='.archiving.log')
2315 # don't use _set_ids, since we don't want to set the host_ids
2316 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002317
2318
mbligh4608b002010-01-05 18:22:35 +00002319 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002320 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002321
2322
Aviv Keshet308e7362013-05-21 14:43:16 -07002323 # TODO: Refactor into autoserv_utils. crbug.com/243090
mbligh4608b002010-01-05 18:22:35 +00002324 def _generate_command(self, results_dir):
2325 return [_autoserv_path , '-p',
2326 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002327 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002328 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2329 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002330
2331
mbligh4608b002010-01-05 18:22:35 +00002332 @classmethod
2333 def _max_processes(cls):
2334 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002335
2336
2337 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002338 self._check_queue_entry_statuses(
2339 self.queue_entries,
2340 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2341
2342 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002343
2344
mbligh4608b002010-01-05 18:22:35 +00002345 def epilog(self):
2346 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002347 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002348 failed_file = os.path.join(self._working_directory(),
2349 self._ARCHIVING_FAILED_FILE)
2350 paired_process = self._paired_with_monitor().get_process()
2351 _drone_manager.write_lines_to_file(
2352 failed_file, ['Archiving failed with exit code %s'
2353 % self.monitor.exit_code()],
2354 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002355 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002356
2357
mbligh36768f02008-02-22 18:28:33 +00002358if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002359 main()