blob: 8cf08a572776ca9b3a50cf265c826d7b920ba4e9 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
mbligh36768f02008-02-22 18:28:33 +00008
Aviv Keshet225bdfe2013-03-05 10:10:08 -08009import datetime, optparse, os, signal
10import sys, time, traceback, urllib
11import logging, gc
showard402934a2009-12-21 22:20:47 +000012
Alex Miller05d7b4c2013-03-04 07:49:38 -080013import common
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000015
16import django.db
17
showard136e6dc2009-06-10 19:38:49 +000018from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000019from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000020from autotest_lib.database import database_connection
jamesrendd855242010-03-02 22:23:44 +000021from autotest_lib.frontend.afe import model_attributes
Alex Miller05d7b4c2013-03-04 07:49:38 -080022from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000023from autotest_lib.scheduler import drone_manager, drones, email_manager
Dale Curtisaa513362011-03-01 17:27:44 -080024from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
Alex Miller05d7b4c2013-03-04 07:49:38 -080025from autotest_lib.scheduler import scheduler_logging_config
jamesrenc44ae992010-02-19 00:12:54 +000026from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080027from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070028from autotest_lib.server import autoserv_utils
Fang Deng1d6c2a02013-04-17 15:25:45 -070029from autotest_lib.site_utils.graphite import stats
Alex Miller05d7b4c2013-03-04 07:49:38 -080030
showard549afad2009-08-20 23:33:36 +000031BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
32PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000033
mbligh36768f02008-02-22 18:28:33 +000034RESULTS_DIR = '.'
35AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000036DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000037AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
38
39if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000040 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000041AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
42AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
43
44if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000045 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000046
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
Aviv Keshet308e7362013-05-21 14:43:16 -070056_autoserv_directory = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server')
57_autoserv_path = os.path.join(_autoserv_directory, 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000059_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000060
Eric Lie0493a42010-11-15 13:05:43 -080061def _parser_path_default(install_dir):
62 return os.path.join(install_dir, 'tko', 'parse')
63_parser_path_func = utils.import_site_function(
64 __file__, 'autotest_lib.scheduler.site_monitor_db',
65 'parser_path', _parser_path_default)
66_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
67
mbligh36768f02008-02-22 18:28:33 +000068
showardec6a3b92009-09-25 20:29:13 +000069def _get_pidfile_timeout_secs():
70 """@returns How long to wait for autoserv to write pidfile."""
71 pidfile_timeout_mins = global_config.global_config.get_config_value(
72 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
73 return pidfile_timeout_mins * 60
74
75
mbligh83c1e9e2009-05-01 23:10:41 +000076def _site_init_monitor_db_dummy():
77 return {}
78
79
jamesren76fcf192010-04-21 20:39:50 +000080def _verify_default_drone_set_exists():
81 if (models.DroneSet.drone_sets_enabled() and
82 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080083 raise host_scheduler.SchedulerError(
84 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000085
86
87def _sanity_check():
88 """Make sure the configs are consistent before starting the scheduler"""
89 _verify_default_drone_set_exists()
90
91
mbligh36768f02008-02-22 18:28:33 +000092def main():
showard27f33872009-04-07 18:20:53 +000093 try:
showard549afad2009-08-20 23:33:36 +000094 try:
95 main_without_exception_handling()
96 except SystemExit:
97 raise
98 except:
99 logging.exception('Exception escaping in monitor_db')
100 raise
101 finally:
102 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000103
104
105def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000106 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000107
showard136e6dc2009-06-10 19:38:49 +0000108 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000109 parser = optparse.OptionParser(usage)
110 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
111 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000112 parser.add_option('--test', help='Indicate that scheduler is under ' +
113 'test and should use dummy autoserv and no parsing',
114 action='store_true')
115 (options, args) = parser.parse_args()
116 if len(args) != 1:
117 parser.print_usage()
118 return
mbligh36768f02008-02-22 18:28:33 +0000119
showard5613c662009-06-08 23:30:33 +0000120 scheduler_enabled = global_config.global_config.get_config_value(
121 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
122
123 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800124 logging.error("Scheduler not enabled, set enable_scheduler to true in "
125 "the global_config's SCHEDULER section to enable it. "
126 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000127 sys.exit(1)
128
jadmanski0afbb632008-06-06 21:10:57 +0000129 global RESULTS_DIR
130 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000131
mbligh83c1e9e2009-05-01 23:10:41 +0000132 site_init = utils.import_site_function(__file__,
133 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
134 _site_init_monitor_db_dummy)
135 site_init()
136
showardcca334f2009-03-12 20:38:34 +0000137 # Change the cwd while running to avoid issues incase we were launched from
138 # somewhere odd (such as a random NFS home directory of the person running
139 # sudo to launch us as the appropriate user).
140 os.chdir(RESULTS_DIR)
141
jamesrenc7d387e2010-08-10 21:48:30 +0000142 # This is helpful for debugging why stuff a scheduler launches is
143 # misbehaving.
144 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000145
jadmanski0afbb632008-06-06 21:10:57 +0000146 if options.test:
147 global _autoserv_path
148 _autoserv_path = 'autoserv_dummy'
149 global _testing_mode
150 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000151
jamesrenc44ae992010-02-19 00:12:54 +0000152 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000153 server.start()
154
jadmanski0afbb632008-06-06 21:10:57 +0000155 try:
jamesrenc44ae992010-02-19 00:12:54 +0000156 initialize()
showardc5afc462009-01-13 00:09:39 +0000157 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000158 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000159
Eric Lia82dc352011-02-23 13:15:52 -0800160 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000161 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000162 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000163 except:
showard170873e2009-01-07 00:22:26 +0000164 email_manager.manager.log_stacktrace(
165 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000166
showard170873e2009-01-07 00:22:26 +0000167 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000168 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000169 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000170 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000171
172
showard136e6dc2009-06-10 19:38:49 +0000173def setup_logging():
174 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
175 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
176 logging_manager.configure_logging(
177 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
178 logfile_name=log_name)
179
180
mbligh36768f02008-02-22 18:28:33 +0000181def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000182 global _shutdown
183 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000184 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000185
186
jamesrenc44ae992010-02-19 00:12:54 +0000187def initialize():
showardb18134f2009-03-20 20:52:18 +0000188 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
189 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000190
showard8de37132009-08-31 18:33:08 +0000191 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000192 logging.critical("monitor_db already running, aborting!")
193 sys.exit(1)
194 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000195
showardb1e51872008-10-07 11:08:18 +0000196 if _testing_mode:
197 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000198 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000199
jadmanski0afbb632008-06-06 21:10:57 +0000200 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
201 global _db
showard170873e2009-01-07 00:22:26 +0000202 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000203 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000204
showardfa8629c2008-11-04 16:51:23 +0000205 # ensure Django connection is in autocommit
206 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000207 # bypass the readonly connection
208 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000209
showardb18134f2009-03-20 20:52:18 +0000210 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000211 signal.signal(signal.SIGINT, handle_sigint)
212
jamesrenc44ae992010-02-19 00:12:54 +0000213 initialize_globals()
214 scheduler_models.initialize()
215
showardd1ee1dd2009-01-07 21:33:08 +0000216 drones = global_config.global_config.get_config_value(
217 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
218 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000219 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000220 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000221 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
222
showardb18134f2009-03-20 20:52:18 +0000223 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000224
225
jamesrenc44ae992010-02-19 00:12:54 +0000226def initialize_globals():
227 global _drone_manager
228 _drone_manager = drone_manager.instance()
229
230
showarded2afea2009-07-07 20:54:07 +0000231def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
232 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000233 """
234 @returns The autoserv command line as a list of executable + parameters.
235
236 @param machines - string - A machine or comma separated list of machines
237 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000238 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700239 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
240 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000241 @param queue_entry - A HostQueueEntry object - If supplied and no Job
242 object was supplied, this will be used to lookup the Job object.
243 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700244 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
245 machines, results_directory=drone_manager.WORKING_DIRECTORY,
246 extra_args=extra_args, job=job, queue_entry=queue_entry,
247 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000248
249
Simran Basia858a232012-08-21 11:04:37 -0700250class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800251
252
jadmanski0afbb632008-06-06 21:10:57 +0000253 def __init__(self):
254 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000255 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800256 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000257 user_cleanup_time = scheduler_config.config.clean_interval
258 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
259 _db, user_cleanup_time)
260 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000261 self._host_agents = {}
262 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000263 self._tick_count = 0
264 self._last_garbage_stats_time = time.time()
265 self._seconds_between_garbage_stats = 60 * (
266 global_config.global_config.get_config_value(
267 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700268 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700269 self._tick_debug = global_config.global_config.get_config_value(
270 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
271 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700272 self._extra_debugging = global_config.global_config.get_config_value(
273 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
274 default=False)
mbligh36768f02008-02-22 18:28:33 +0000275
mbligh36768f02008-02-22 18:28:33 +0000276
showard915958d2009-04-22 21:00:58 +0000277 def initialize(self, recover_hosts=True):
278 self._periodic_cleanup.initialize()
279 self._24hr_upkeep.initialize()
280
jadmanski0afbb632008-06-06 21:10:57 +0000281 # always recover processes
282 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000283
jadmanski0afbb632008-06-06 21:10:57 +0000284 if recover_hosts:
285 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000286
jamesrenc44ae992010-02-19 00:12:54 +0000287 self._host_scheduler.recovery_on_startup()
288
mbligh36768f02008-02-22 18:28:33 +0000289
Simran Basi0ec94dd2012-08-28 09:50:10 -0700290 def _log_tick_msg(self, msg):
291 if self._tick_debug:
292 logging.debug(msg)
293
294
Simran Basidef92872012-09-20 13:34:34 -0700295 def _log_extra_msg(self, msg):
296 if self._extra_debugging:
297 logging.debug(msg)
298
299
jadmanski0afbb632008-06-06 21:10:57 +0000300 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700301 """
302 This is an altered version of tick() where we keep track of when each
303 major step begins so we can try to figure out where we are using most
304 of the tick time.
305 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700306 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700307 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000308 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700309 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000310 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700311 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000312 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700313 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000314 self._find_aborting()
Simran Basi3f6717d2012-09-13 15:21:22 -0700315 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000316 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700317 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000318 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700319 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000320 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700321 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000322 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700323 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000324 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700325 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000326 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700327 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000328 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700329 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000330 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700331 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700332 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700333 with timer.get_client('email_manager_send_queued_emails'):
334 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700335 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700336 with timer.get_client('django_db_reset_queries'):
337 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000338 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000339
showard97aed502008-11-04 02:01:24 +0000340
mblighf3294cc2009-04-08 21:17:38 +0000341 def _run_cleanup(self):
342 self._periodic_cleanup.run_cleanup_maybe()
343 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000344
mbligh36768f02008-02-22 18:28:33 +0000345
showardf13a9e22009-12-18 22:54:09 +0000346 def _garbage_collection(self):
347 threshold_time = time.time() - self._seconds_between_garbage_stats
348 if threshold_time < self._last_garbage_stats_time:
349 # Don't generate these reports very often.
350 return
351
352 self._last_garbage_stats_time = time.time()
353 # Force a full level 0 collection (because we can, it doesn't hurt
354 # at this interval).
355 gc.collect()
356 logging.info('Logging garbage collector stats on tick %d.',
357 self._tick_count)
358 gc_stats._log_garbage_collector_stats()
359
360
showard170873e2009-01-07 00:22:26 +0000361 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
362 for object_id in object_ids:
363 agent_dict.setdefault(object_id, set()).add(agent)
364
365
366 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
367 for object_id in object_ids:
368 assert object_id in agent_dict
369 agent_dict[object_id].remove(agent)
370
371
showardd1195652009-12-08 22:21:02 +0000372 def add_agent_task(self, agent_task):
373 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000374 self._agents.append(agent)
375 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000376 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
377 self._register_agent_for_ids(self._queue_entry_agents,
378 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000379
showard170873e2009-01-07 00:22:26 +0000380
381 def get_agents_for_entry(self, queue_entry):
382 """
383 Find agents corresponding to the specified queue_entry.
384 """
showardd3dc1992009-04-22 21:01:40 +0000385 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000386
387
388 def host_has_agent(self, host):
389 """
390 Determine if there is currently an Agent present using this host.
391 """
392 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000393
394
jadmanski0afbb632008-06-06 21:10:57 +0000395 def remove_agent(self, agent):
396 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000397 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
398 agent)
399 self._unregister_agent_for_ids(self._queue_entry_agents,
400 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000401
402
showard8cc058f2009-09-08 16:26:33 +0000403 def _host_has_scheduled_special_task(self, host):
404 return bool(models.SpecialTask.objects.filter(host__id=host.id,
405 is_active=False,
406 is_complete=False))
407
408
jadmanski0afbb632008-06-06 21:10:57 +0000409 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000410 agent_tasks = self._create_recovery_agent_tasks()
411 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000412 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000413 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000414 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000415 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000416 self._reverify_remaining_hosts()
417 # reinitialize drones after killing orphaned processes, since they can
418 # leave around files when they die
419 _drone_manager.execute_actions()
420 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000421
showard170873e2009-01-07 00:22:26 +0000422
showardd1195652009-12-08 22:21:02 +0000423 def _create_recovery_agent_tasks(self):
424 return (self._get_queue_entry_agent_tasks()
425 + self._get_special_task_agent_tasks(is_active=True))
426
427
428 def _get_queue_entry_agent_tasks(self):
429 # host queue entry statuses handled directly by AgentTasks (Verifying is
430 # handled through SpecialTasks, so is not listed here)
431 statuses = (models.HostQueueEntry.Status.STARTING,
432 models.HostQueueEntry.Status.RUNNING,
433 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000434 models.HostQueueEntry.Status.PARSING,
435 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000436 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000437 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000438 where='status IN (%s)' % status_list)
439
440 agent_tasks = []
441 used_queue_entries = set()
442 for entry in queue_entries:
443 if self.get_agents_for_entry(entry):
444 # already being handled
445 continue
446 if entry in used_queue_entries:
447 # already picked up by a synchronous job
448 continue
449 agent_task = self._get_agent_task_for_queue_entry(entry)
450 agent_tasks.append(agent_task)
451 used_queue_entries.update(agent_task.queue_entries)
452 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000453
454
showardd1195652009-12-08 22:21:02 +0000455 def _get_special_task_agent_tasks(self, is_active=False):
456 special_tasks = models.SpecialTask.objects.filter(
457 is_active=is_active, is_complete=False)
458 return [self._get_agent_task_for_special_task(task)
459 for task in special_tasks]
460
461
462 def _get_agent_task_for_queue_entry(self, queue_entry):
463 """
464 Construct an AgentTask instance for the given active HostQueueEntry,
465 if one can currently run it.
466 @param queue_entry: a HostQueueEntry
467 @returns an AgentTask to run the queue entry
468 """
469 task_entries = queue_entry.job.get_group_entries(queue_entry)
470 self._check_for_duplicate_host_entries(task_entries)
471
472 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
473 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000474 if queue_entry.is_hostless():
475 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000476 return QueueTask(queue_entries=task_entries)
477 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
478 return GatherLogsTask(queue_entries=task_entries)
479 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
480 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000481 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
482 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000483
Dale Curtisaa513362011-03-01 17:27:44 -0800484 raise host_scheduler.SchedulerError(
485 '_get_agent_task_for_queue_entry got entry with '
486 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000487
488
489 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000490 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
491 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000492 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000493 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000494 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000495 if using_host:
showardd1195652009-12-08 22:21:02 +0000496 self._assert_host_has_no_agent(task_entry)
497
498
499 def _assert_host_has_no_agent(self, entry):
500 """
501 @param entry: a HostQueueEntry or a SpecialTask
502 """
503 if self.host_has_agent(entry.host):
504 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800505 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000506 'While scheduling %s, host %s already has a host agent %s'
507 % (entry, entry.host, agent.task))
508
509
510 def _get_agent_task_for_special_task(self, special_task):
511 """
512 Construct an AgentTask class to run the given SpecialTask and add it
513 to this dispatcher.
514 @param special_task: a models.SpecialTask instance
515 @returns an AgentTask to run this SpecialTask
516 """
517 self._assert_host_has_no_agent(special_task)
518
Dan Shi07e09af2013-04-12 09:31:29 -0700519 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask,
520 ResetTask)
showardd1195652009-12-08 22:21:02 +0000521 for agent_task_class in special_agent_task_classes:
522 if agent_task_class.TASK_TYPE == special_task.task:
523 return agent_task_class(task=special_task)
524
Dale Curtisaa513362011-03-01 17:27:44 -0800525 raise host_scheduler.SchedulerError(
526 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000527
528
529 def _register_pidfiles(self, agent_tasks):
530 for agent_task in agent_tasks:
531 agent_task.register_necessary_pidfiles()
532
533
534 def _recover_tasks(self, agent_tasks):
535 orphans = _drone_manager.get_orphaned_autoserv_processes()
536
537 for agent_task in agent_tasks:
538 agent_task.recover()
539 if agent_task.monitor and agent_task.monitor.has_process():
540 orphans.discard(agent_task.monitor.get_process())
541 self.add_agent_task(agent_task)
542
543 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000544
545
showard8cc058f2009-09-08 16:26:33 +0000546 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000547 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
548 % status):
showard0db3d432009-10-12 20:29:15 +0000549 if entry.status == status and not self.get_agents_for_entry(entry):
550 # The status can change during iteration, e.g., if job.run()
551 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000552 yield entry
553
554
showard6878e8b2009-07-20 22:37:45 +0000555 def _check_for_remaining_orphan_processes(self, orphans):
556 if not orphans:
557 return
558 subject = 'Unrecovered orphan autoserv processes remain'
559 message = '\n'.join(str(process) for process in orphans)
560 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000561
562 die_on_orphans = global_config.global_config.get_config_value(
563 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
564
565 if die_on_orphans:
566 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000567
showard170873e2009-01-07 00:22:26 +0000568
showard8cc058f2009-09-08 16:26:33 +0000569 def _recover_pending_entries(self):
570 for entry in self._get_unassigned_entries(
571 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000572 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000573 entry.on_pending()
574
575
showardb8900452009-10-12 20:31:01 +0000576 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000577 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000578 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
579 unrecovered_hqes = []
580 for queue_entry in queue_entries:
581 special_tasks = models.SpecialTask.objects.filter(
582 task__in=(models.SpecialTask.Task.CLEANUP,
583 models.SpecialTask.Task.VERIFY),
584 queue_entry__id=queue_entry.id,
585 is_complete=False)
586 if special_tasks.count() == 0:
587 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000588
showardb8900452009-10-12 20:31:01 +0000589 if unrecovered_hqes:
590 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800591 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000592 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000593 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000594
595
showard65db3932009-10-28 19:54:35 +0000596 def _get_prioritized_special_tasks(self):
597 """
598 Returns all queued SpecialTasks prioritized for repair first, then
599 cleanup, then verify.
600 """
601 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
602 is_complete=False,
603 host__locked=False)
604 # exclude hosts with active queue entries unless the SpecialTask is for
605 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000606 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000607 queued_tasks, 'afe_host_queue_entries', 'host_id',
608 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000609 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000610 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000611 where=['(afe_host_queue_entries.id IS NULL OR '
612 'afe_host_queue_entries.id = '
613 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000614
showard65db3932009-10-28 19:54:35 +0000615 # reorder tasks by priority
616 task_priority_order = [models.SpecialTask.Task.REPAIR,
617 models.SpecialTask.Task.CLEANUP,
Dan Shi07e09af2013-04-12 09:31:29 -0700618 models.SpecialTask.Task.VERIFY,
619 models.SpecialTask.Task.RESET]
showard65db3932009-10-28 19:54:35 +0000620 def task_priority_key(task):
621 return task_priority_order.index(task.task)
622 return sorted(queued_tasks, key=task_priority_key)
623
624
showard65db3932009-10-28 19:54:35 +0000625 def _schedule_special_tasks(self):
626 """
627 Execute queued SpecialTasks that are ready to run on idle hosts.
628 """
629 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000630 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000631 continue
showardd1195652009-12-08 22:21:02 +0000632 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000633
634
showard170873e2009-01-07 00:22:26 +0000635 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000636 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000637 # should never happen
showarded2afea2009-07-07 20:54:07 +0000638 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000639 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000640 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000641 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000642 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000643
644
jadmanski0afbb632008-06-06 21:10:57 +0000645 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000646 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700647 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000648 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000649 if self.host_has_agent(host):
650 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000651 continue
showard8cc058f2009-09-08 16:26:33 +0000652 if self._host_has_scheduled_special_task(host):
653 # host will have a special task scheduled on the next cycle
654 continue
showard170873e2009-01-07 00:22:26 +0000655 if print_message:
showardb18134f2009-03-20 20:52:18 +0000656 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000657 models.SpecialTask.objects.create(
658 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000659 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000660
661
jadmanski0afbb632008-06-06 21:10:57 +0000662 def _recover_hosts(self):
663 # recover "Repair Failed" hosts
664 message = 'Reverifying dead host %s'
665 self._reverify_hosts_where("status = 'Repair Failed'",
666 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000667
668
showard04c82c52008-05-29 19:38:12 +0000669
showardb95b1bd2008-08-15 18:11:04 +0000670 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000671 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +0000672 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000673 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000674 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +0000675 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000676
677
showard89f84db2009-03-12 20:39:13 +0000678 def _refresh_pending_queue_entries(self):
679 """
680 Lookup the pending HostQueueEntries and call our HostScheduler
681 refresh() method given that list. Return the list.
682
683 @returns A list of pending HostQueueEntries sorted in priority order.
684 """
showard63a34772008-08-18 19:32:50 +0000685 queue_entries = self._get_pending_queue_entries()
686 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000687 return []
showardb95b1bd2008-08-15 18:11:04 +0000688
showard63a34772008-08-18 19:32:50 +0000689 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000690
showard89f84db2009-03-12 20:39:13 +0000691 return queue_entries
692
693
694 def _schedule_atomic_group(self, queue_entry):
695 """
696 Schedule the given queue_entry on an atomic group of hosts.
697
698 Returns immediately if there are insufficient available hosts.
699
700 Creates new HostQueueEntries based off of queue_entry for the
701 scheduled hosts and starts them all running.
702 """
703 # This is a virtual host queue entry representing an entire
704 # atomic group, find a group and schedule their hosts.
705 group_hosts = self._host_scheduler.find_eligible_atomic_group(
706 queue_entry)
707 if not group_hosts:
708 return
showardcbe6f942009-06-17 19:33:49 +0000709
710 logging.info('Expanding atomic group entry %s with hosts %s',
711 queue_entry,
712 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000713
showard89f84db2009-03-12 20:39:13 +0000714 for assigned_host in group_hosts[1:]:
715 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000716 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000717 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000718 new_hqe.set_host(assigned_host)
719 self._run_queue_entry(new_hqe)
720
721 # The first assigned host uses the original HostQueueEntry
722 queue_entry.set_host(group_hosts[0])
723 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000724
725
showarda9545c02009-12-18 22:44:26 +0000726 def _schedule_hostless_job(self, queue_entry):
727 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000728 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000729
730
showard89f84db2009-03-12 20:39:13 +0000731 def _schedule_new_jobs(self):
732 queue_entries = self._refresh_pending_queue_entries()
733 if not queue_entries:
734 return
735
Simran Basi3f6717d2012-09-13 15:21:22 -0700736 logging.debug('Processing %d queue_entries', len(queue_entries))
showard63a34772008-08-18 19:32:50 +0000737 for queue_entry in queue_entries:
Simran Basidef92872012-09-20 13:34:34 -0700738 self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
showarde55955f2009-10-07 20:48:58 +0000739 is_unassigned_atomic_group = (
740 queue_entry.atomic_group_id is not None
741 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +0000742
743 if queue_entry.is_hostless():
Simran Basidef92872012-09-20 13:34:34 -0700744 self._log_extra_msg('Scheduling hostless job.')
showarda9545c02009-12-18 22:44:26 +0000745 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000746 elif is_unassigned_atomic_group:
747 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +0000748 else:
jamesren883492a2010-02-12 00:45:18 +0000749 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +0000750 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +0000751 assert assigned_host.id == queue_entry.host_id
752 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000753
754
showard8cc058f2009-09-08 16:26:33 +0000755 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +0000756 for agent_task in self._get_queue_entry_agent_tasks():
757 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000758
759
760 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000761 for entry in scheduler_models.HostQueueEntry.fetch(
762 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000763 task = entry.job.schedule_delayed_callback_task(entry)
764 if task:
showardd1195652009-12-08 22:21:02 +0000765 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000766
767
jamesren883492a2010-02-12 00:45:18 +0000768 def _run_queue_entry(self, queue_entry):
Simran Basidef92872012-09-20 13:34:34 -0700769 self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
770 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000771 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000772
773
jadmanski0afbb632008-06-06 21:10:57 +0000774 def _find_aborting(self):
jamesrene7c65cb2010-06-08 20:38:10 +0000775 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000776 for entry in scheduler_models.HostQueueEntry.fetch(
777 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +0000778 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +0000779 for agent in self.get_agents_for_entry(entry):
780 agent.abort()
781 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000782 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700783 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000784 for job in jobs_to_stop:
785 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000786
787
showard324bf812009-01-20 23:23:38 +0000788 def _can_start_agent(self, agent, num_started_this_cycle,
789 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000790 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000791 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000792 return True
793 # don't allow any nonzero-process agents to run after we've reached a
794 # limit (this avoids starvation of many-process agents)
795 if have_reached_limit:
796 return False
797 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000798 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000799 agent.task.owner_username,
800 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000801 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000802 return False
803 # if a single agent exceeds the per-cycle throttling, still allow it to
804 # run when it's the first agent in the cycle
805 if num_started_this_cycle == 0:
806 return True
807 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000808 if (num_started_this_cycle + agent.task.num_processes >
809 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000810 return False
811 return True
812
813
jadmanski0afbb632008-06-06 21:10:57 +0000814 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000815 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000816 have_reached_limit = False
817 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700818 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000819 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700820 self._log_extra_msg('Processing Agent with Host Ids: %s and '
821 'queue_entry ids:%s' % (agent.host_ids,
822 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000823 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000824 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000825 have_reached_limit):
826 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700827 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000828 continue
showardd1195652009-12-08 22:21:02 +0000829 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700830 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000831 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700832 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000833 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700834 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000835 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700836 logging.info('%d running processes. %d added this cycle.',
837 _drone_manager.total_running_processes(),
838 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +0000839
840
showard29f7cd22009-04-29 21:16:24 +0000841 def _process_recurring_runs(self):
842 recurring_runs = models.RecurringRun.objects.filter(
843 start_date__lte=datetime.datetime.now())
844 for rrun in recurring_runs:
845 # Create job from template
846 job = rrun.job
847 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000848 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000849
850 host_objects = info['hosts']
851 one_time_hosts = info['one_time_hosts']
852 metahost_objects = info['meta_hosts']
853 dependencies = info['dependencies']
854 atomic_group = info['atomic_group']
855
856 for host in one_time_hosts or []:
857 this_host = models.Host.create_one_time_host(host.hostname)
858 host_objects.append(this_host)
859
860 try:
861 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000862 options=options,
showard29f7cd22009-04-29 21:16:24 +0000863 host_objects=host_objects,
864 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +0000865 atomic_group=atomic_group)
866
867 except Exception, ex:
868 logging.exception(ex)
869 #TODO send email
870
871 if rrun.loop_count == 1:
872 rrun.delete()
873 else:
874 if rrun.loop_count != 0: # if not infinite loop
875 # calculate new start_date
876 difference = datetime.timedelta(seconds=rrun.loop_period)
877 rrun.start_date = rrun.start_date + difference
878 rrun.loop_count -= 1
879 rrun.save()
880
881
Simran Basia858a232012-08-21 11:04:37 -0700882SiteDispatcher = utils.import_site_class(
883 __file__, 'autotest_lib.scheduler.site_monitor_db',
884 'SiteDispatcher', BaseDispatcher)
885
886class Dispatcher(SiteDispatcher):
887 pass
888
889
showard170873e2009-01-07 00:22:26 +0000890class PidfileRunMonitor(object):
891 """
892 Client must call either run() to start a new process or
893 attach_to_existing_process().
894 """
mbligh36768f02008-02-22 18:28:33 +0000895
showard170873e2009-01-07 00:22:26 +0000896 class _PidfileException(Exception):
897 """
898 Raised when there's some unexpected behavior with the pid file, but only
899 used internally (never allowed to escape this class).
900 """
mbligh36768f02008-02-22 18:28:33 +0000901
902
showard170873e2009-01-07 00:22:26 +0000903 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000904 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000905 self._start_time = None
906 self.pidfile_id = None
907 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000908
909
showard170873e2009-01-07 00:22:26 +0000910 def _add_nice_command(self, command, nice_level):
911 if not nice_level:
912 return command
913 return ['nice', '-n', str(nice_level)] + command
914
915
916 def _set_start_time(self):
917 self._start_time = time.time()
918
919
showard418785b2009-11-23 20:19:59 +0000920 def run(self, command, working_directory, num_processes, nice_level=None,
921 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +0000922 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +0000923 assert command is not None
924 if nice_level is not None:
925 command = ['nice', '-n', str(nice_level)] + command
926 self._set_start_time()
927 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +0000928 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +0000929 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +0000930 paired_with_pidfile=paired_with_pidfile, username=username,
931 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +0000932
933
showarded2afea2009-07-07 20:54:07 +0000934 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +0000935 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +0000936 num_processes=None):
showard170873e2009-01-07 00:22:26 +0000937 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +0000938 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000939 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +0000940 if num_processes is not None:
941 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +0000942
943
jadmanski0afbb632008-06-06 21:10:57 +0000944 def kill(self):
showard170873e2009-01-07 00:22:26 +0000945 if self.has_process():
946 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000947
mbligh36768f02008-02-22 18:28:33 +0000948
showard170873e2009-01-07 00:22:26 +0000949 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000950 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000951 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000952
953
showard170873e2009-01-07 00:22:26 +0000954 def get_process(self):
showard21baa452008-10-21 00:08:39 +0000955 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +0000956 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +0000957 return self._state.process
mblighbb421852008-03-11 22:36:16 +0000958
959
showard170873e2009-01-07 00:22:26 +0000960 def _read_pidfile(self, use_second_read=False):
961 assert self.pidfile_id is not None, (
962 'You must call run() or attach_to_existing_process()')
963 contents = _drone_manager.get_pidfile_contents(
964 self.pidfile_id, use_second_read=use_second_read)
965 if contents.is_invalid():
966 self._state = drone_manager.PidfileContents()
967 raise self._PidfileException(contents)
968 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +0000969
970
showard21baa452008-10-21 00:08:39 +0000971 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +0000972 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
973 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +0000974 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +0000975 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +0000976
977
978 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +0000979 if self.lost_process:
showard21baa452008-10-21 00:08:39 +0000980 return
mblighbb421852008-03-11 22:36:16 +0000981
showard21baa452008-10-21 00:08:39 +0000982 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +0000983
showard170873e2009-01-07 00:22:26 +0000984 if self._state.process is None:
985 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +0000986 return
mbligh90a549d2008-03-25 23:52:34 +0000987
showard21baa452008-10-21 00:08:39 +0000988 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000989 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +0000990 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +0000991 return
mbligh90a549d2008-03-25 23:52:34 +0000992
showard170873e2009-01-07 00:22:26 +0000993 # pid but no running process - maybe process *just* exited
994 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +0000995 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000996 # autoserv exited without writing an exit code
997 # to the pidfile
showard21baa452008-10-21 00:08:39 +0000998 self._handle_pidfile_error(
999 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001000
showard21baa452008-10-21 00:08:39 +00001001
1002 def _get_pidfile_info(self):
1003 """\
1004 After completion, self._state will contain:
1005 pid=None, exit_status=None if autoserv has not yet run
1006 pid!=None, exit_status=None if autoserv is running
1007 pid!=None, exit_status!=None if autoserv has completed
1008 """
1009 try:
1010 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001011 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001012 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001013
1014
showard170873e2009-01-07 00:22:26 +00001015 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001016 """\
1017 Called when no pidfile is found or no pid is in the pidfile.
1018 """
showard170873e2009-01-07 00:22:26 +00001019 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001020 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001021 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001022 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001023 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001024
1025
showard35162b02009-03-03 02:17:30 +00001026 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001027 """\
1028 Called when autoserv has exited without writing an exit status,
1029 or we've timed out waiting for autoserv to write a pid to the
1030 pidfile. In either case, we just return failure and the caller
1031 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001032
showard170873e2009-01-07 00:22:26 +00001033 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001034 """
1035 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001036 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001037 self._state.exit_status = 1
1038 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001039
1040
jadmanski0afbb632008-06-06 21:10:57 +00001041 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001042 self._get_pidfile_info()
1043 return self._state.exit_status
1044
1045
1046 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001047 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001048 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001049 if self._state.num_tests_failed is None:
1050 return -1
showard21baa452008-10-21 00:08:39 +00001051 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001052
1053
showardcdaeae82009-08-31 18:32:48 +00001054 def try_copy_results_on_drone(self, **kwargs):
1055 if self.has_process():
1056 # copy results logs into the normal place for job results
1057 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1058
1059
1060 def try_copy_to_results_repository(self, source, **kwargs):
1061 if self.has_process():
1062 _drone_manager.copy_to_results_repository(self.get_process(),
1063 source, **kwargs)
1064
1065
mbligh36768f02008-02-22 18:28:33 +00001066class Agent(object):
showard77182562009-06-10 00:16:05 +00001067 """
showard8cc058f2009-09-08 16:26:33 +00001068 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001069
1070 The following methods are required on all task objects:
1071 poll() - Called periodically to let the task check its status and
1072 update its internal state. If the task succeeded.
1073 is_done() - Returns True if the task is finished.
1074 abort() - Called when an abort has been requested. The task must
1075 set its aborted attribute to True if it actually aborted.
1076
1077 The following attributes are required on all task objects:
1078 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001079 success - bool, True if this task succeeded.
1080 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1081 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001082 """
1083
1084
showard418785b2009-11-23 20:19:59 +00001085 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001086 """
showard8cc058f2009-09-08 16:26:33 +00001087 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001088 """
showard8cc058f2009-09-08 16:26:33 +00001089 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001090
showard77182562009-06-10 00:16:05 +00001091 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001092 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001093
showard8cc058f2009-09-08 16:26:33 +00001094 self.queue_entry_ids = task.queue_entry_ids
1095 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001096
showard8cc058f2009-09-08 16:26:33 +00001097 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001098 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001099
1100
jadmanski0afbb632008-06-06 21:10:57 +00001101 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001102 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001103 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001104 self.task.poll()
1105 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001106 self.finished = True
showardec113162008-05-08 00:52:49 +00001107
1108
jadmanski0afbb632008-06-06 21:10:57 +00001109 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001110 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001111
1112
showardd3dc1992009-04-22 21:01:40 +00001113 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001114 if self.task:
1115 self.task.abort()
1116 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001117 # tasks can choose to ignore aborts
Alex Millerd51d8862013-07-23 15:19:36 -07001118 # but task is an afe HQE model, which always aborts when asked
1119 # to, so this always happens given the current code.
showard9bb960b2009-11-19 01:02:11 +00001120 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001121
showardd3dc1992009-04-22 21:01:40 +00001122
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001123class BaseAgentTask(object):
showardd1195652009-12-08 22:21:02 +00001124 class _NullMonitor(object):
1125 pidfile_id = None
1126
1127 def has_process(self):
1128 return True
1129
1130
1131 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001132 """
showardd1195652009-12-08 22:21:02 +00001133 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001134 """
jadmanski0afbb632008-06-06 21:10:57 +00001135 self.done = False
showardd1195652009-12-08 22:21:02 +00001136 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001137 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001138 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001139 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001140 self.queue_entry_ids = []
1141 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001142 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001143
1144
1145 def _set_ids(self, host=None, queue_entries=None):
1146 if queue_entries and queue_entries != [None]:
1147 self.host_ids = [entry.host.id for entry in queue_entries]
1148 self.queue_entry_ids = [entry.id for entry in queue_entries]
1149 else:
1150 assert host
1151 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001152
1153
jadmanski0afbb632008-06-06 21:10:57 +00001154 def poll(self):
showard08a36412009-05-05 01:01:13 +00001155 if not self.started:
1156 self.start()
showardd1195652009-12-08 22:21:02 +00001157 if not self.done:
1158 self.tick()
showard08a36412009-05-05 01:01:13 +00001159
1160
1161 def tick(self):
showardd1195652009-12-08 22:21:02 +00001162 assert self.monitor
1163 exit_code = self.monitor.exit_code()
1164 if exit_code is None:
1165 return
mbligh36768f02008-02-22 18:28:33 +00001166
showardd1195652009-12-08 22:21:02 +00001167 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001168 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001169
1170
jadmanski0afbb632008-06-06 21:10:57 +00001171 def is_done(self):
1172 return self.done
mbligh36768f02008-02-22 18:28:33 +00001173
1174
jadmanski0afbb632008-06-06 21:10:57 +00001175 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001176 if self.done:
showardd1195652009-12-08 22:21:02 +00001177 assert self.started
showard08a36412009-05-05 01:01:13 +00001178 return
showardd1195652009-12-08 22:21:02 +00001179 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001180 self.done = True
1181 self.success = success
1182 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001183
1184
jadmanski0afbb632008-06-06 21:10:57 +00001185 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001186 """
1187 To be overridden.
1188 """
showarded2afea2009-07-07 20:54:07 +00001189 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001190 self.register_necessary_pidfiles()
1191
1192
1193 def _log_file(self):
1194 if not self._log_file_name:
1195 return None
1196 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001197
mbligh36768f02008-02-22 18:28:33 +00001198
jadmanski0afbb632008-06-06 21:10:57 +00001199 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001200 log_file = self._log_file()
1201 if self.monitor and log_file:
1202 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001203
1204
jadmanski0afbb632008-06-06 21:10:57 +00001205 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001206 """
1207 To be overridden.
1208 """
jadmanski0afbb632008-06-06 21:10:57 +00001209 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001210 logging.info("%s finished with success=%s", type(self).__name__,
1211 self.success)
1212
mbligh36768f02008-02-22 18:28:33 +00001213
1214
jadmanski0afbb632008-06-06 21:10:57 +00001215 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001216 if not self.started:
1217 self.prolog()
1218 self.run()
1219
1220 self.started = True
1221
1222
1223 def abort(self):
1224 if self.monitor:
1225 self.monitor.kill()
1226 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001227 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001228 self.cleanup()
1229
1230
showarded2afea2009-07-07 20:54:07 +00001231 def _get_consistent_execution_path(self, execution_entries):
1232 first_execution_path = execution_entries[0].execution_path()
1233 for execution_entry in execution_entries[1:]:
1234 assert execution_entry.execution_path() == first_execution_path, (
1235 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1236 execution_entry,
1237 first_execution_path,
1238 execution_entries[0]))
1239 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001240
1241
showarded2afea2009-07-07 20:54:07 +00001242 def _copy_results(self, execution_entries, use_monitor=None):
1243 """
1244 @param execution_entries: list of objects with execution_path() method
1245 """
showard6d1c1432009-08-20 23:30:39 +00001246 if use_monitor is not None and not use_monitor.has_process():
1247 return
1248
showarded2afea2009-07-07 20:54:07 +00001249 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001250 if use_monitor is None:
1251 assert self.monitor
1252 use_monitor = self.monitor
1253 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001254 execution_path = self._get_consistent_execution_path(execution_entries)
1255 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001256 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001257
showarda1e74b32009-05-12 17:32:04 +00001258
1259 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001260 for queue_entry in queue_entries:
1261 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001262
1263
mbligh4608b002010-01-05 18:22:35 +00001264 def _archive_results(self, queue_entries):
1265 for queue_entry in queue_entries:
1266 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001267
1268
showardd1195652009-12-08 22:21:02 +00001269 def _command_line(self):
1270 """
1271 Return the command line to run. Must be overridden.
1272 """
1273 raise NotImplementedError
1274
1275
1276 @property
1277 def num_processes(self):
1278 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001279 Return the number of processes forked by this BaseAgentTask's process.
1280 It may only be approximate. To be overridden if necessary.
showardd1195652009-12-08 22:21:02 +00001281 """
1282 return 1
1283
1284
1285 def _paired_with_monitor(self):
1286 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001287 If this BaseAgentTask's process must run on the same machine as some
showardd1195652009-12-08 22:21:02 +00001288 previous process, this method should be overridden to return a
1289 PidfileRunMonitor for that process.
1290 """
1291 return self._NullMonitor()
1292
1293
1294 @property
1295 def owner_username(self):
1296 """
1297 Return login of user responsible for this task. May be None. Must be
1298 overridden.
1299 """
1300 raise NotImplementedError
1301
1302
1303 def _working_directory(self):
1304 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001305 Return the directory where this BaseAgentTask's process executes.
1306 Must be overridden.
showardd1195652009-12-08 22:21:02 +00001307 """
1308 raise NotImplementedError
1309
1310
1311 def _pidfile_name(self):
1312 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001313 Return the name of the pidfile this BaseAgentTask's process uses. To be
showardd1195652009-12-08 22:21:02 +00001314 overridden if necessary.
1315 """
jamesrenc44ae992010-02-19 00:12:54 +00001316 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001317
1318
1319 def _check_paired_results_exist(self):
1320 if not self._paired_with_monitor().has_process():
1321 email_manager.manager.enqueue_notify_email(
1322 'No paired results in task',
1323 'No paired results in task %s at %s'
1324 % (self, self._paired_with_monitor().pidfile_id))
1325 self.finished(False)
1326 return False
1327 return True
1328
1329
1330 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001331 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001332 self.monitor = PidfileRunMonitor()
1333
1334
1335 def run(self):
1336 if not self._check_paired_results_exist():
1337 return
1338
1339 self._create_monitor()
1340 self.monitor.run(
1341 self._command_line(), self._working_directory(),
1342 num_processes=self.num_processes,
1343 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1344 pidfile_name=self._pidfile_name(),
1345 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001346 username=self.owner_username,
1347 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1348
1349
1350 def get_drone_hostnames_allowed(self):
1351 if not models.DroneSet.drone_sets_enabled():
1352 return None
1353
1354 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1355 if not hqes:
1356 # Only special tasks could be missing host queue entries
1357 assert isinstance(self, SpecialAgentTask)
1358 return self._user_or_global_default_drone_set(
1359 self.task, self.task.requested_by)
1360
1361 job_ids = hqes.values_list('job', flat=True).distinct()
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001362 assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
jamesren76fcf192010-04-21 20:39:50 +00001363 "span multiple jobs")
1364
1365 job = models.Job.objects.get(id=job_ids[0])
1366 drone_set = job.drone_set
1367 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001368 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001369
1370 return drone_set.get_drone_hostnames()
1371
1372
1373 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1374 """
1375 Returns the user's default drone set, if present.
1376
1377 Otherwise, returns the global default drone set.
1378 """
1379 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1380 if not user:
1381 logging.warn('%s had no owner; using default drone set',
1382 obj_with_owner)
1383 return default_hostnames
1384 if not user.drone_set:
1385 logging.warn('User %s has no default drone set, using global '
1386 'default', user.login)
1387 return default_hostnames
1388 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001389
1390
1391 def register_necessary_pidfiles(self):
1392 pidfile_id = _drone_manager.get_pidfile_id_from(
1393 self._working_directory(), self._pidfile_name())
1394 _drone_manager.register_pidfile(pidfile_id)
1395
1396 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1397 if paired_pidfile_id:
1398 _drone_manager.register_pidfile(paired_pidfile_id)
1399
1400
1401 def recover(self):
1402 if not self._check_paired_results_exist():
1403 return
1404
1405 self._create_monitor()
1406 self.monitor.attach_to_existing_process(
1407 self._working_directory(), pidfile_name=self._pidfile_name(),
1408 num_processes=self.num_processes)
1409 if not self.monitor.has_process():
1410 # no process to recover; wait to be started normally
1411 self.monitor = None
1412 return
1413
1414 self.started = True
Aviv Keshet225bdfe2013-03-05 10:10:08 -08001415 logging.info('Recovering process %s for %s at %s',
1416 self.monitor.get_process(), type(self).__name__,
1417 self._working_directory())
mbligh36768f02008-02-22 18:28:33 +00001418
1419
mbligh4608b002010-01-05 18:22:35 +00001420 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1421 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001422 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001423 for entry in queue_entries:
1424 if entry.status not in allowed_hqe_statuses:
Dale Curtisaa513362011-03-01 17:27:44 -08001425 raise host_scheduler.SchedulerError(
1426 '%s attempting to start entry with invalid status %s: '
1427 '%s' % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001428 invalid_host_status = (
1429 allowed_host_statuses is not None
1430 and entry.host.status not in allowed_host_statuses)
1431 if invalid_host_status:
Dale Curtisaa513362011-03-01 17:27:44 -08001432 raise host_scheduler.SchedulerError(
1433 '%s attempting to start on queue entry with invalid '
1434 'host status %s: %s'
1435 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001436
1437
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001438SiteAgentTask = utils.import_site_class(
1439 __file__, 'autotest_lib.scheduler.site_monitor_db',
1440 'SiteAgentTask', BaseAgentTask)
1441
1442class AgentTask(SiteAgentTask):
1443 pass
1444
1445
showardd9205182009-04-27 20:09:55 +00001446class TaskWithJobKeyvals(object):
1447 """AgentTask mixin providing functionality to help with job keyval files."""
1448 _KEYVAL_FILE = 'keyval'
1449 def _format_keyval(self, key, value):
1450 return '%s=%s' % (key, value)
1451
1452
1453 def _keyval_path(self):
1454 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001455 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001456
1457
1458 def _write_keyval_after_job(self, field, value):
1459 assert self.monitor
1460 if not self.monitor.has_process():
1461 return
1462 _drone_manager.write_lines_to_file(
1463 self._keyval_path(), [self._format_keyval(field, value)],
1464 paired_with_process=self.monitor.get_process())
1465
1466
1467 def _job_queued_keyval(self, job):
1468 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1469
1470
1471 def _write_job_finished(self):
1472 self._write_keyval_after_job("job_finished", int(time.time()))
1473
1474
showarddb502762009-09-09 15:31:20 +00001475 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1476 keyval_contents = '\n'.join(self._format_keyval(key, value)
1477 for key, value in keyval_dict.iteritems())
1478 # always end with a newline to allow additional keyvals to be written
1479 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001480 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001481 keyval_contents,
1482 file_path=keyval_path)
1483
1484
1485 def _write_keyvals_before_job(self, keyval_dict):
1486 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1487
1488
1489 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001490 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001491 host.hostname)
1492 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001493 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001494 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1495 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1496
1497
showard8cc058f2009-09-08 16:26:33 +00001498class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001499 """
1500 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1501 """
1502
1503 TASK_TYPE = None
1504 host = None
1505 queue_entry = None
1506
showardd1195652009-12-08 22:21:02 +00001507 def __init__(self, task, extra_command_args):
1508 super(SpecialAgentTask, self).__init__()
1509
lmrb7c5d272010-04-16 06:34:04 +00001510 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001511
jamesrenc44ae992010-02-19 00:12:54 +00001512 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001513 self.queue_entry = None
1514 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001515 self.queue_entry = scheduler_models.HostQueueEntry(
1516 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001517
showarded2afea2009-07-07 20:54:07 +00001518 self.task = task
1519 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001520
1521
showard8cc058f2009-09-08 16:26:33 +00001522 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001523 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1524
1525
1526 def _command_line(self):
1527 return _autoserv_command_line(self.host.hostname,
1528 self._extra_command_args,
1529 queue_entry=self.queue_entry)
1530
1531
1532 def _working_directory(self):
1533 return self.task.execution_path()
1534
1535
1536 @property
1537 def owner_username(self):
1538 if self.task.requested_by:
1539 return self.task.requested_by.login
1540 return None
showard8cc058f2009-09-08 16:26:33 +00001541
1542
showarded2afea2009-07-07 20:54:07 +00001543 def prolog(self):
1544 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001545 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001546 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001547
1548
showardde634ee2009-01-30 01:44:24 +00001549 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001550 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001551
showard2fe3f1d2009-07-06 20:19:11 +00001552 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001553 return # don't fail metahost entries, they'll be reassigned
1554
showard2fe3f1d2009-07-06 20:19:11 +00001555 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001556 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001557 return # entry has been aborted
1558
showard2fe3f1d2009-07-06 20:19:11 +00001559 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001560 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001561 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001562 self._write_keyval_after_job(queued_key, queued_time)
1563 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001564
showard8cc058f2009-09-08 16:26:33 +00001565 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001566 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001567 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001568 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001569
showard8cc058f2009-09-08 16:26:33 +00001570 pidfile_id = _drone_manager.get_pidfile_id_from(
1571 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001572 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001573 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001574
1575 if self.queue_entry.job.parse_failed_repair:
1576 self._parse_results([self.queue_entry])
1577 else:
1578 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001579
Alex Miller23676a22013-07-03 09:03:36 -07001580 # Also fail all other special tasks that have not yet run for this HQE
1581 pending_tasks = models.SpecialTask.objects.filter(
1582 queue_entry__id=self.queue_entry.id,
1583 is_complete=0)
1584 if pending_tasks:
1585 for task in pending_tasks:
1586 task.finish(False)
1587
showard8cc058f2009-09-08 16:26:33 +00001588
1589 def cleanup(self):
1590 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001591
1592 # We will consider an aborted task to be "Failed"
1593 self.task.finish(bool(self.success))
1594
showardf85a0b72009-10-07 20:48:45 +00001595 if self.monitor:
1596 if self.monitor.has_process():
1597 self._copy_results([self.task])
1598 if self.monitor.pidfile_id is not None:
1599 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001600
1601
Dan Shi07e09af2013-04-12 09:31:29 -07001602 def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
1603 """Remove a type of special task in all tasks, keep last one if needed.
1604
1605 @param special_task_to_remove: type of special task to be removed, e.g.,
1606 models.SpecialTask.Task.VERIFY.
1607 @param keep_last_one: True to keep the last special task if its type is
1608 the same as of special_task_to_remove.
1609
1610 """
1611 queued_special_tasks = models.SpecialTask.objects.filter(
1612 host__id=self.host.id,
1613 task=special_task_to_remove,
1614 is_active=False, is_complete=False, queue_entry=None)
1615 if keep_last_one:
1616 queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
1617 queued_special_tasks.delete()
1618
1619
showard8cc058f2009-09-08 16:26:33 +00001620class RepairTask(SpecialAgentTask):
1621 TASK_TYPE = models.SpecialTask.Task.REPAIR
1622
1623
showardd1195652009-12-08 22:21:02 +00001624 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001625 """\
1626 queue_entry: queue entry to mark failed if this repair fails.
1627 """
1628 protection = host_protections.Protection.get_string(
1629 task.host.protection)
1630 # normalize the protection name
1631 protection = host_protections.Protection.get_attr_name(protection)
1632
1633 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001634 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001635
1636 # *don't* include the queue entry in IDs -- if the queue entry is
1637 # aborted, we want to leave the repair task running
1638 self._set_ids(host=self.host)
1639
1640
1641 def prolog(self):
1642 super(RepairTask, self).prolog()
1643 logging.info("repair_task starting")
1644 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001645
1646
jadmanski0afbb632008-06-06 21:10:57 +00001647 def epilog(self):
1648 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001649
jadmanski0afbb632008-06-06 21:10:57 +00001650 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001651 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001652 else:
showard8cc058f2009-09-08 16:26:33 +00001653 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001654 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001655 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001656
1657
showarded2afea2009-07-07 20:54:07 +00001658class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001659 def _copy_to_results_repository(self):
1660 if not self.queue_entry or self.queue_entry.meta_host:
1661 return
1662
1663 self.queue_entry.set_execution_subdir()
1664 log_name = os.path.basename(self.task.execution_path())
1665 source = os.path.join(self.task.execution_path(), 'debug',
1666 'autoserv.DEBUG')
1667 destination = os.path.join(
1668 self.queue_entry.execution_path(), log_name)
1669
1670 self.monitor.try_copy_to_results_repository(
1671 source, destination_path=destination)
1672
1673
showard170873e2009-01-07 00:22:26 +00001674 def epilog(self):
1675 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001676
showard775300b2009-09-09 15:30:50 +00001677 if self.success:
1678 return
showard8fe93b52008-11-18 17:53:22 +00001679
showard775300b2009-09-09 15:30:50 +00001680 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001681
showard775300b2009-09-09 15:30:50 +00001682 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001683 # effectively ignore failure for these hosts
1684 self.success = True
showard775300b2009-09-09 15:30:50 +00001685 return
1686
1687 if self.queue_entry:
1688 self.queue_entry.requeue()
1689
1690 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001691 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001692 queue_entry__id=self.queue_entry.id):
1693 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1694 self._fail_queue_entry()
1695 return
1696
showard9bb960b2009-11-19 01:02:11 +00001697 queue_entry = models.HostQueueEntry.objects.get(
1698 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001699 else:
1700 queue_entry = None
1701
1702 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001703 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001704 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001705 queue_entry=queue_entry,
1706 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001707
showard8fe93b52008-11-18 17:53:22 +00001708
Alex Miller42437f92013-05-28 12:58:54 -07001709 def _should_pending(self):
1710 """
1711 Decide if we should call the host queue entry's on_pending method.
1712 We should if:
1713 1) There exists an associated host queue entry.
1714 2) The current special task completed successfully.
1715 3) There do not exist any more special tasks to be run before the
1716 host queue entry starts.
1717
1718 @returns: True if we should call pending, false if not.
1719
1720 """
1721 if not self.queue_entry or not self.success:
1722 return False
1723
1724 # We know if this is the last one when we create it, so we could add
1725 # another column to the database to keep track of this information, but
1726 # I expect the overhead of querying here to be minimal.
1727 queue_entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1728 queued = models.SpecialTask.objects.filter(
1729 host__id=self.host.id, is_active=False,
1730 is_complete=False, queue_entry=queue_entry)
1731 queued = queued.exclude(id=self.task.id)
1732 return queued.count() == 0
1733
1734
showard8fe93b52008-11-18 17:53:22 +00001735class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001736 TASK_TYPE = models.SpecialTask.Task.VERIFY
1737
1738
showardd1195652009-12-08 22:21:02 +00001739 def __init__(self, task):
1740 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001741 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001742
1743
jadmanski0afbb632008-06-06 21:10:57 +00001744 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001745 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001746
showardb18134f2009-03-20 20:52:18 +00001747 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001748 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001749 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1750 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001751
jamesren42318f72010-05-10 23:40:59 +00001752 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00001753 # and there's no need to keep records of other requests.
Dan Shi07e09af2013-04-12 09:31:29 -07001754 self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
1755 keep_last_one=True)
showard2fe3f1d2009-07-06 20:19:11 +00001756
mbligh36768f02008-02-22 18:28:33 +00001757
jadmanski0afbb632008-06-06 21:10:57 +00001758 def epilog(self):
1759 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001760 if self.success:
Alex Miller42437f92013-05-28 12:58:54 -07001761 if self._should_pending():
showard8cc058f2009-09-08 16:26:33 +00001762 self.queue_entry.on_pending()
1763 else:
1764 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001765
1766
mbligh4608b002010-01-05 18:22:35 +00001767class CleanupTask(PreJobTask):
1768 # note this can also run post-job, but when it does, it's running standalone
1769 # against the host (not related to the job), so it's not considered a
1770 # PostJobTask
1771
1772 TASK_TYPE = models.SpecialTask.Task.CLEANUP
1773
1774
1775 def __init__(self, task, recover_run_monitor=None):
1776 super(CleanupTask, self).__init__(task, ['--cleanup'])
1777 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1778
1779
1780 def prolog(self):
1781 super(CleanupTask, self).prolog()
1782 logging.info("starting cleanup task for host: %s", self.host.hostname)
1783 self.host.set_status(models.Host.Status.CLEANING)
1784 if self.queue_entry:
Dan Shi07e09af2013-04-12 09:31:29 -07001785 self.queue_entry.set_status(models.HostQueueEntry.Status.CLEANING)
mbligh4608b002010-01-05 18:22:35 +00001786
1787
1788 def _finish_epilog(self):
1789 if not self.queue_entry or not self.success:
1790 return
1791
1792 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
1793 should_run_verify = (
1794 self.queue_entry.job.run_verify
1795 and self.host.protection != do_not_verify_protection)
1796 if should_run_verify:
1797 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1798 models.SpecialTask.objects.create(
1799 host=models.Host.objects.get(id=self.host.id),
1800 queue_entry=entry,
1801 task=models.SpecialTask.Task.VERIFY)
1802 else:
Alex Miller42437f92013-05-28 12:58:54 -07001803 if self._should_pending():
1804 self.queue_entry.on_pending()
mbligh4608b002010-01-05 18:22:35 +00001805
1806
1807 def epilog(self):
1808 super(CleanupTask, self).epilog()
1809
1810 if self.success:
1811 self.host.update_field('dirty', 0)
1812 self.host.set_status(models.Host.Status.READY)
1813
1814 self._finish_epilog()
1815
1816
Dan Shi07e09af2013-04-12 09:31:29 -07001817class ResetTask(PreJobTask):
1818 """Task to reset a DUT, including cleanup and verify."""
1819 # note this can also run post-job, but when it does, it's running standalone
1820 # against the host (not related to the job), so it's not considered a
1821 # PostJobTask
1822
1823 TASK_TYPE = models.SpecialTask.Task.RESET
1824
1825
1826 def __init__(self, task, recover_run_monitor=None):
1827 super(ResetTask, self).__init__(task, ['--reset'])
1828 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1829
1830
1831 def prolog(self):
1832 super(ResetTask, self).prolog()
1833 logging.info('starting reset task for host: %s',
1834 self.host.hostname)
1835 self.host.set_status(models.Host.Status.RESETTING)
1836 if self.queue_entry:
1837 self.queue_entry.set_status(models.HostQueueEntry.Status.RESETTING)
1838
1839 # Delete any queued cleanups for this host.
1840 self.remove_special_tasks(models.SpecialTask.Task.CLEANUP,
1841 keep_last_one=False)
1842
1843 # Delete any queued reverifies for this host.
1844 self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
1845 keep_last_one=False)
1846
1847 # Only one reset is needed.
1848 self.remove_special_tasks(models.SpecialTask.Task.RESET,
1849 keep_last_one=True)
1850
1851
1852 def epilog(self):
1853 super(ResetTask, self).epilog()
1854
1855 if self.success:
1856 self.host.update_field('dirty', 0)
1857 self.host.set_status(models.Host.Status.READY)
1858
Alex Millerba076c52013-07-11 10:11:48 -07001859 if self._should_pending():
Dan Shi07e09af2013-04-12 09:31:29 -07001860 self.queue_entry.on_pending()
1861
1862
showarda9545c02009-12-18 22:44:26 +00001863class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
1864 """
1865 Common functionality for QueueTask and HostlessQueueTask
1866 """
1867 def __init__(self, queue_entries):
1868 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001869 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001870 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001871
1872
showard73ec0442009-02-07 02:05:20 +00001873 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001874 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001875
1876
jamesrenc44ae992010-02-19 00:12:54 +00001877 def _write_control_file(self, execution_path):
1878 control_path = _drone_manager.attach_file_to_execution(
1879 execution_path, self.job.control_file)
1880 return control_path
1881
1882
Aviv Keshet308e7362013-05-21 14:43:16 -07001883 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001884 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001885 execution_path = self.queue_entries[0].execution_path()
1886 control_path = self._write_control_file(execution_path)
1887 hostnames = ','.join(entry.host.hostname
1888 for entry in self.queue_entries
1889 if not entry.is_hostless())
1890
1891 execution_tag = self.queue_entries[0].execution_tag()
1892 params = _autoserv_command_line(
1893 hostnames,
beepscb6f1e22013-06-28 19:14:10 -07001894 ['-P', execution_tag, '-n', '--verify_job_repo_url',
jamesrenc44ae992010-02-19 00:12:54 +00001895 _drone_manager.absolute_path(control_path)],
1896 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001897 if self.job.is_image_update_job():
1898 params += ['--image', self.job.update_image_path]
1899
jamesrenc44ae992010-02-19 00:12:54 +00001900 return params
showardd1195652009-12-08 22:21:02 +00001901
1902
1903 @property
1904 def num_processes(self):
1905 return len(self.queue_entries)
1906
1907
1908 @property
1909 def owner_username(self):
1910 return self.job.owner
1911
1912
1913 def _working_directory(self):
1914 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001915
1916
jadmanski0afbb632008-06-06 21:10:57 +00001917 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001918 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001919 keyval_dict = self.job.keyval_dict()
1920 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001921 group_name = self.queue_entries[0].get_group_name()
1922 if group_name:
1923 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001924 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001925 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001926 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001927 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001928
1929
showard35162b02009-03-03 02:17:30 +00001930 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001931 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001932 _drone_manager.write_lines_to_file(error_file_path,
1933 [_LOST_PROCESS_ERROR])
1934
1935
showardd3dc1992009-04-22 21:01:40 +00001936 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001937 if not self.monitor:
1938 return
1939
showardd9205182009-04-27 20:09:55 +00001940 self._write_job_finished()
1941
showard35162b02009-03-03 02:17:30 +00001942 if self.monitor.lost_process:
1943 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001944
jadmanskif7fa2cc2008-10-01 14:13:23 +00001945
showardcbd74612008-11-19 21:42:02 +00001946 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001947 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001948 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001949 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001950 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001951
1952
jadmanskif7fa2cc2008-10-01 14:13:23 +00001953 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001954 if not self.monitor or not self.monitor.has_process():
1955 return
1956
jadmanskif7fa2cc2008-10-01 14:13:23 +00001957 # build up sets of all the aborted_by and aborted_on values
1958 aborted_by, aborted_on = set(), set()
1959 for queue_entry in self.queue_entries:
1960 if queue_entry.aborted_by:
1961 aborted_by.add(queue_entry.aborted_by)
1962 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1963 aborted_on.add(t)
1964
1965 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001966 # TODO(showard): this conditional is now obsolete, we just need to leave
1967 # it in temporarily for backwards compatibility over upgrades. delete
1968 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001969 assert len(aborted_by) <= 1
1970 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001971 aborted_by_value = aborted_by.pop()
1972 aborted_on_value = max(aborted_on)
1973 else:
1974 aborted_by_value = 'autotest_system'
1975 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001976
showarda0382352009-02-11 23:36:43 +00001977 self._write_keyval_after_job("aborted_by", aborted_by_value)
1978 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001979
showardcbd74612008-11-19 21:42:02 +00001980 aborted_on_string = str(datetime.datetime.fromtimestamp(
1981 aborted_on_value))
1982 self._write_status_comment('Job aborted by %s on %s' %
1983 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001984
1985
jadmanski0afbb632008-06-06 21:10:57 +00001986 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001987 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001988 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001989 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001990
1991
jadmanski0afbb632008-06-06 21:10:57 +00001992 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001993 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001994 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001995
1996
1997class QueueTask(AbstractQueueTask):
1998 def __init__(self, queue_entries):
1999 super(QueueTask, self).__init__(queue_entries)
2000 self._set_ids(queue_entries=queue_entries)
2001
2002
2003 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002004 self._check_queue_entry_statuses(
2005 self.queue_entries,
2006 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2007 models.HostQueueEntry.Status.RUNNING),
2008 allowed_host_statuses=(models.Host.Status.PENDING,
2009 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002010
2011 super(QueueTask, self).prolog()
2012
2013 for queue_entry in self.queue_entries:
2014 self._write_host_keyvals(queue_entry.host)
2015 queue_entry.host.set_status(models.Host.Status.RUNNING)
2016 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00002017
2018
2019 def _finish_task(self):
2020 super(QueueTask, self)._finish_task()
2021
2022 for queue_entry in self.queue_entries:
2023 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00002024 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00002025
2026
mbligh4608b002010-01-05 18:22:35 +00002027class HostlessQueueTask(AbstractQueueTask):
2028 def __init__(self, queue_entry):
2029 super(HostlessQueueTask, self).__init__([queue_entry])
2030 self.queue_entry_ids = [queue_entry.id]
2031
2032
2033 def prolog(self):
2034 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2035 super(HostlessQueueTask, self).prolog()
2036
2037
mbligh4608b002010-01-05 18:22:35 +00002038 def _finish_task(self):
2039 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002040 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002041
2042
showardd3dc1992009-04-22 21:01:40 +00002043class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002044 def __init__(self, queue_entries, log_file_name):
2045 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002046
showardd1195652009-12-08 22:21:02 +00002047 self.queue_entries = queue_entries
2048
showardd3dc1992009-04-22 21:01:40 +00002049 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002050 self._autoserv_monitor.attach_to_existing_process(
2051 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002052
showardd1195652009-12-08 22:21:02 +00002053
2054 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002055 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002056 return 'true'
2057 return self._generate_command(
2058 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002059
2060
2061 def _generate_command(self, results_dir):
2062 raise NotImplementedError('Subclasses must override this')
2063
2064
showardd1195652009-12-08 22:21:02 +00002065 @property
2066 def owner_username(self):
2067 return self.queue_entries[0].job.owner
2068
2069
2070 def _working_directory(self):
2071 return self._get_consistent_execution_path(self.queue_entries)
2072
2073
2074 def _paired_with_monitor(self):
2075 return self._autoserv_monitor
2076
2077
showardd3dc1992009-04-22 21:01:40 +00002078 def _job_was_aborted(self):
2079 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002080 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002081 queue_entry.update_from_database()
2082 if was_aborted is None: # first queue entry
2083 was_aborted = bool(queue_entry.aborted)
2084 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00002085 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
2086 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00002087 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00002088 'Inconsistent abort state',
2089 'Queue entries have inconsistent abort state:\n' +
2090 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00002091 # don't crash here, just assume true
2092 return True
2093 return was_aborted
2094
2095
showardd1195652009-12-08 22:21:02 +00002096 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002097 if self._job_was_aborted():
2098 return models.HostQueueEntry.Status.ABORTED
2099
2100 # we'll use a PidfileRunMonitor to read the autoserv exit status
2101 if self._autoserv_monitor.exit_code() == 0:
2102 return models.HostQueueEntry.Status.COMPLETED
2103 return models.HostQueueEntry.Status.FAILED
2104
2105
showardd3dc1992009-04-22 21:01:40 +00002106 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002107 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002108 queue_entry.set_status(status)
2109
2110
2111 def abort(self):
2112 # override AgentTask.abort() to avoid killing the process and ending
2113 # the task. post-job tasks continue when the job is aborted.
2114 pass
2115
2116
mbligh4608b002010-01-05 18:22:35 +00002117 def _pidfile_label(self):
2118 # '.autoserv_execute' -> 'autoserv'
2119 return self._pidfile_name()[1:-len('_execute')]
2120
2121
showard9bb960b2009-11-19 01:02:11 +00002122class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002123 """
2124 Task responsible for
2125 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2126 * copying logs to the results repository
2127 * spawning CleanupTasks for hosts, if necessary
2128 * spawning a FinalReparseTask for the job
2129 """
showardd1195652009-12-08 22:21:02 +00002130 def __init__(self, queue_entries, recover_run_monitor=None):
2131 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002132 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002133 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002134 self._set_ids(queue_entries=queue_entries)
2135
2136
Aviv Keshet308e7362013-05-21 14:43:16 -07002137 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd3dc1992009-04-22 21:01:40 +00002138 def _generate_command(self, results_dir):
2139 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002140 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002141 return [_autoserv_path , '-p',
2142 '--pidfile-label=%s' % self._pidfile_label(),
2143 '--use-existing-results', '--collect-crashinfo',
2144 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002145
2146
showardd1195652009-12-08 22:21:02 +00002147 @property
2148 def num_processes(self):
2149 return len(self.queue_entries)
2150
2151
2152 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002153 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002154
2155
showardd3dc1992009-04-22 21:01:40 +00002156 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002157 self._check_queue_entry_statuses(
2158 self.queue_entries,
2159 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2160 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002161
showardd3dc1992009-04-22 21:01:40 +00002162 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002163
2164
showardd3dc1992009-04-22 21:01:40 +00002165 def epilog(self):
2166 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002167 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002168 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002169
showard9bb960b2009-11-19 01:02:11 +00002170
2171 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002172 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002173 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002174 models.HostQueueEntry.Status.COMPLETED)
2175 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2176 else:
2177 final_success = False
2178 num_tests_failed = 0
showard9bb960b2009-11-19 01:02:11 +00002179 reboot_after = self._job.reboot_after
2180 do_reboot = (
2181 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002182 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002183 or reboot_after == model_attributes.RebootAfter.ALWAYS
2184 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
Dan Shi07e09af2013-04-12 09:31:29 -07002185 and final_success and num_tests_failed == 0)
2186 or num_tests_failed > 0)
showard9bb960b2009-11-19 01:02:11 +00002187
showardd1195652009-12-08 22:21:02 +00002188 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002189 if do_reboot:
2190 # don't pass the queue entry to the CleanupTask. if the cleanup
2191 # fails, the job doesn't care -- it's over.
2192 models.SpecialTask.objects.create(
2193 host=models.Host.objects.get(id=queue_entry.host.id),
2194 task=models.SpecialTask.Task.CLEANUP,
2195 requested_by=self._job.owner_model())
2196 else:
2197 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002198
2199
showard0bbfc212009-04-29 21:06:13 +00002200 def run(self):
showard597bfd32009-05-08 18:22:50 +00002201 autoserv_exit_code = self._autoserv_monitor.exit_code()
2202 # only run if Autoserv exited due to some signal. if we have no exit
2203 # code, assume something bad (and signal-like) happened.
2204 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002205 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002206 else:
2207 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002208
2209
mbligh4608b002010-01-05 18:22:35 +00002210class SelfThrottledPostJobTask(PostJobTask):
2211 """
2212 Special AgentTask subclass that maintains its own global process limit.
2213 """
2214 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002215
2216
mbligh4608b002010-01-05 18:22:35 +00002217 @classmethod
2218 def _increment_running_processes(cls):
2219 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002220
mblighd5c95802008-03-05 00:33:46 +00002221
mbligh4608b002010-01-05 18:22:35 +00002222 @classmethod
2223 def _decrement_running_processes(cls):
2224 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002225
2226
mbligh4608b002010-01-05 18:22:35 +00002227 @classmethod
2228 def _max_processes(cls):
2229 raise NotImplementedError
2230
2231
2232 @classmethod
2233 def _can_run_new_process(cls):
2234 return cls._num_running_processes < cls._max_processes()
2235
2236
2237 def _process_started(self):
2238 return bool(self.monitor)
2239
2240
2241 def tick(self):
2242 # override tick to keep trying to start until the process count goes
2243 # down and we can, at which point we revert to default behavior
2244 if self._process_started():
2245 super(SelfThrottledPostJobTask, self).tick()
2246 else:
2247 self._try_starting_process()
2248
2249
2250 def run(self):
2251 # override run() to not actually run unless we can
2252 self._try_starting_process()
2253
2254
2255 def _try_starting_process(self):
2256 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002257 return
2258
mbligh4608b002010-01-05 18:22:35 +00002259 # actually run the command
2260 super(SelfThrottledPostJobTask, self).run()
jamesren25663562010-04-27 18:00:55 +00002261 if self._process_started():
2262 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002263
mblighd5c95802008-03-05 00:33:46 +00002264
mbligh4608b002010-01-05 18:22:35 +00002265 def finished(self, success):
2266 super(SelfThrottledPostJobTask, self).finished(success)
2267 if self._process_started():
2268 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002269
showard21baa452008-10-21 00:08:39 +00002270
mbligh4608b002010-01-05 18:22:35 +00002271class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002272 def __init__(self, queue_entries):
2273 super(FinalReparseTask, self).__init__(queue_entries,
2274 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002275 # don't use _set_ids, since we don't want to set the host_ids
2276 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002277
2278
2279 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002280 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002281 results_dir]
2282
2283
2284 @property
2285 def num_processes(self):
2286 return 0 # don't include parser processes in accounting
2287
2288
2289 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002290 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002291
2292
showard97aed502008-11-04 02:01:24 +00002293 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002294 def _max_processes(cls):
2295 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002296
2297
2298 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002299 self._check_queue_entry_statuses(
2300 self.queue_entries,
2301 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002302
showard97aed502008-11-04 02:01:24 +00002303 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002304
2305
2306 def epilog(self):
2307 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002308 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002309
2310
mbligh4608b002010-01-05 18:22:35 +00002311class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002312 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2313
mbligh4608b002010-01-05 18:22:35 +00002314 def __init__(self, queue_entries):
2315 super(ArchiveResultsTask, self).__init__(queue_entries,
2316 log_file_name='.archiving.log')
2317 # don't use _set_ids, since we don't want to set the host_ids
2318 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002319
2320
mbligh4608b002010-01-05 18:22:35 +00002321 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002322 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002323
2324
Aviv Keshet308e7362013-05-21 14:43:16 -07002325 # TODO: Refactor into autoserv_utils. crbug.com/243090
mbligh4608b002010-01-05 18:22:35 +00002326 def _generate_command(self, results_dir):
2327 return [_autoserv_path , '-p',
2328 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002329 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002330 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2331 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002332
2333
mbligh4608b002010-01-05 18:22:35 +00002334 @classmethod
2335 def _max_processes(cls):
2336 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002337
2338
2339 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002340 self._check_queue_entry_statuses(
2341 self.queue_entries,
2342 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2343
2344 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002345
2346
mbligh4608b002010-01-05 18:22:35 +00002347 def epilog(self):
2348 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002349 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002350 failed_file = os.path.join(self._working_directory(),
2351 self._ARCHIVING_FAILED_FILE)
2352 paired_process = self._paired_with_monitor().get_process()
2353 _drone_manager.write_lines_to_file(
2354 failed_file, ['Archiving failed with exit code %s'
2355 % self.monitor.exit_code()],
2356 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002357 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002358
2359
mbligh36768f02008-02-22 18:28:33 +00002360if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002361 main()