blob: 047b889779a6238b1ba8ea001bf0cdcdc5db83d6 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
Aviv Keshet225bdfe2013-03-05 10:10:08 -08008import datetime, optparse, os, signal
beeps5e2bb4a2013-10-28 11:26:45 -07009import sys, time
Aviv Keshet225bdfe2013-03-05 10:10:08 -080010import logging, gc
showard402934a2009-12-21 22:20:47 +000011
Alex Miller05d7b4c2013-03-04 07:49:38 -080012import common
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000014
15import django.db
16
Prashanth B0e960282014-05-13 19:38:28 -070017from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070018from autotest_lib.client.common_lib import utils
Prashanth B0e960282014-05-13 19:38:28 -070019from autotest_lib.frontend.afe import models, rpc_utils
beeps5e2bb4a2013-10-28 11:26:45 -070020from autotest_lib.scheduler import agent_task, drone_manager, drones
21from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
22from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070023from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070024from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070025from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000026from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080027from autotest_lib.scheduler import status_server, scheduler_config
Prashanth Bf66d51b2014-05-06 12:42:25 -070028from autotest_lib.scheduler import scheduler_lib
Aviv Keshet308e7362013-05-21 14:43:16 -070029from autotest_lib.server import autoserv_utils
Fang Deng1d6c2a02013-04-17 15:25:45 -070030from autotest_lib.site_utils.graphite import stats
Alex Miller05d7b4c2013-03-04 07:49:38 -080031
showard549afad2009-08-20 23:33:36 +000032BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
33PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000034
mbligh36768f02008-02-22 18:28:33 +000035RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000036AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
37
38if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000039 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000040AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
41AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
42
43if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000044 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000045
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
Prashanth B0e960282014-05-13 19:38:28 -070053_db_manager = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070055
56# These 2 globals are replaced for testing
57_autoserv_directory = autoserv_utils.autoserv_directory
58_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000059_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000060_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070061_inline_host_acquisition = global_config.global_config.get_config_value(
62 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
63 default=True)
64
mbligh36768f02008-02-22 18:28:33 +000065
Eric Lie0493a42010-11-15 13:05:43 -080066def _parser_path_default(install_dir):
67 return os.path.join(install_dir, 'tko', 'parse')
68_parser_path_func = utils.import_site_function(
69 __file__, 'autotest_lib.scheduler.site_monitor_db',
70 'parser_path', _parser_path_default)
71_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
72
mbligh36768f02008-02-22 18:28:33 +000073
mbligh83c1e9e2009-05-01 23:10:41 +000074def _site_init_monitor_db_dummy():
75 return {}
76
77
jamesren76fcf192010-04-21 20:39:50 +000078def _verify_default_drone_set_exists():
79 if (models.DroneSet.drone_sets_enabled() and
80 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070081 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080082 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000083
84
85def _sanity_check():
86 """Make sure the configs are consistent before starting the scheduler"""
87 _verify_default_drone_set_exists()
88
89
mbligh36768f02008-02-22 18:28:33 +000090def main():
showard27f33872009-04-07 18:20:53 +000091 try:
showard549afad2009-08-20 23:33:36 +000092 try:
93 main_without_exception_handling()
94 except SystemExit:
95 raise
96 except:
97 logging.exception('Exception escaping in monitor_db')
98 raise
99 finally:
100 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000101
102
103def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700104 scheduler_lib.setup_logging(
105 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
106 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000107 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000108 parser = optparse.OptionParser(usage)
109 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
110 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000111 parser.add_option('--test', help='Indicate that scheduler is under ' +
112 'test and should use dummy autoserv and no parsing',
113 action='store_true')
114 (options, args) = parser.parse_args()
115 if len(args) != 1:
116 parser.print_usage()
117 return
mbligh36768f02008-02-22 18:28:33 +0000118
showard5613c662009-06-08 23:30:33 +0000119 scheduler_enabled = global_config.global_config.get_config_value(
120 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
121
122 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800123 logging.error("Scheduler not enabled, set enable_scheduler to true in "
124 "the global_config's SCHEDULER section to enable it. "
125 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000126 sys.exit(1)
127
jadmanski0afbb632008-06-06 21:10:57 +0000128 global RESULTS_DIR
129 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000130
mbligh83c1e9e2009-05-01 23:10:41 +0000131 site_init = utils.import_site_function(__file__,
132 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
133 _site_init_monitor_db_dummy)
134 site_init()
135
showardcca334f2009-03-12 20:38:34 +0000136 # Change the cwd while running to avoid issues incase we were launched from
137 # somewhere odd (such as a random NFS home directory of the person running
138 # sudo to launch us as the appropriate user).
139 os.chdir(RESULTS_DIR)
140
jamesrenc7d387e2010-08-10 21:48:30 +0000141 # This is helpful for debugging why stuff a scheduler launches is
142 # misbehaving.
143 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000144
jadmanski0afbb632008-06-06 21:10:57 +0000145 if options.test:
146 global _autoserv_path
147 _autoserv_path = 'autoserv_dummy'
148 global _testing_mode
149 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000150
jamesrenc44ae992010-02-19 00:12:54 +0000151 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000152 server.start()
153
jadmanski0afbb632008-06-06 21:10:57 +0000154 try:
jamesrenc44ae992010-02-19 00:12:54 +0000155 initialize()
showardc5afc462009-01-13 00:09:39 +0000156 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000157 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000158
Eric Lia82dc352011-02-23 13:15:52 -0800159 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000160 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000161 time.sleep(scheduler_config.config.tick_pause_sec)
Prashanth B4ec98672014-05-15 10:44:54 -0700162 except Exception:
showard170873e2009-01-07 00:22:26 +0000163 email_manager.manager.log_stacktrace(
164 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000165
showard170873e2009-01-07 00:22:26 +0000166 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000167 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000168 _drone_manager.shutdown()
Prashanth B0e960282014-05-13 19:38:28 -0700169 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000170
171
Prashanth B4ec98672014-05-15 10:44:54 -0700172def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000173 global _shutdown
174 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000175 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000176
177
jamesrenc44ae992010-02-19 00:12:54 +0000178def initialize():
showardb18134f2009-03-20 20:52:18 +0000179 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
180 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000181
showard8de37132009-08-31 18:33:08 +0000182 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000183 logging.critical("monitor_db already running, aborting!")
184 sys.exit(1)
185 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000186
showardb1e51872008-10-07 11:08:18 +0000187 if _testing_mode:
188 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700189 scheduler_lib.DB_CONFIG_SECTION, 'database',
190 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000191
jadmanski0afbb632008-06-06 21:10:57 +0000192 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700193 global _db_manager
194 _db_manager = scheduler_lib.ConnectionManager()
showardb18134f2009-03-20 20:52:18 +0000195 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700196 signal.signal(signal.SIGINT, handle_signal)
197 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000198
jamesrenc44ae992010-02-19 00:12:54 +0000199 initialize_globals()
200 scheduler_models.initialize()
201
showardd1ee1dd2009-01-07 21:33:08 +0000202 drones = global_config.global_config.get_config_value(
203 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
204 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000205 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000206 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000207 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
208
showardb18134f2009-03-20 20:52:18 +0000209 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000210
211
jamesrenc44ae992010-02-19 00:12:54 +0000212def initialize_globals():
213 global _drone_manager
214 _drone_manager = drone_manager.instance()
215
216
showarded2afea2009-07-07 20:54:07 +0000217def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
218 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000219 """
220 @returns The autoserv command line as a list of executable + parameters.
221
222 @param machines - string - A machine or comma separated list of machines
223 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000224 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700225 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
226 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000227 @param queue_entry - A HostQueueEntry object - If supplied and no Job
228 object was supplied, this will be used to lookup the Job object.
229 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700230 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
231 machines, results_directory=drone_manager.WORKING_DIRECTORY,
232 extra_args=extra_args, job=job, queue_entry=queue_entry,
233 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000234
235
Simran Basia858a232012-08-21 11:04:37 -0700236class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800237
238
jadmanski0afbb632008-06-06 21:10:57 +0000239 def __init__(self):
240 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000241 self._last_clean_time = time.time()
mblighf3294cc2009-04-08 21:17:38 +0000242 user_cleanup_time = scheduler_config.config.clean_interval
243 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Prashanth B0e960282014-05-13 19:38:28 -0700244 _db_manager.get_connection(), user_cleanup_time)
245 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
246 _db_manager.get_connection())
showard170873e2009-01-07 00:22:26 +0000247 self._host_agents = {}
248 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000249 self._tick_count = 0
250 self._last_garbage_stats_time = time.time()
251 self._seconds_between_garbage_stats = 60 * (
252 global_config.global_config.get_config_value(
253 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700254 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700255 self._tick_debug = global_config.global_config.get_config_value(
256 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
257 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700258 self._extra_debugging = global_config.global_config.get_config_value(
259 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
260 default=False)
mbligh36768f02008-02-22 18:28:33 +0000261
Prashanth Bf66d51b2014-05-06 12:42:25 -0700262 # If _inline_host_acquisition is set the scheduler will acquire and
263 # release hosts against jobs inline, with the tick. Otherwise the
264 # scheduler will only focus on jobs that already have hosts, and
265 # will not explicitly unlease a host when a job finishes using it.
266 self._job_query_manager = query_managers.AFEJobQueryManager()
267 self._host_scheduler = (host_scheduler.BaseHostScheduler()
268 if _inline_host_acquisition else
269 host_scheduler.DummyHostScheduler())
270
mbligh36768f02008-02-22 18:28:33 +0000271
showard915958d2009-04-22 21:00:58 +0000272 def initialize(self, recover_hosts=True):
273 self._periodic_cleanup.initialize()
274 self._24hr_upkeep.initialize()
275
jadmanski0afbb632008-06-06 21:10:57 +0000276 # always recover processes
277 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000278
jadmanski0afbb632008-06-06 21:10:57 +0000279 if recover_hosts:
280 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000281
282
Simran Basi0ec94dd2012-08-28 09:50:10 -0700283 def _log_tick_msg(self, msg):
284 if self._tick_debug:
285 logging.debug(msg)
286
287
Simran Basidef92872012-09-20 13:34:34 -0700288 def _log_extra_msg(self, msg):
289 if self._extra_debugging:
290 logging.debug(msg)
291
292
jadmanski0afbb632008-06-06 21:10:57 +0000293 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700294 """
295 This is an altered version of tick() where we keep track of when each
296 major step begins so we can try to figure out where we are using most
297 of the tick time.
298 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700299 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700300 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000301 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700302 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000303 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700304 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000305 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700306 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000307 self._find_aborting()
beeps8bb1f7d2013-08-05 01:30:09 -0700308 self._log_tick_msg('Calling _find_aborted_special_tasks().')
309 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700310 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000311 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700312 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000313 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700314 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000315 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700316 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000317 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700318 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000319 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700320 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000321 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700322 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000323 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700324 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000325 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700326 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700327 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700328 with timer.get_client('email_manager_send_queued_emails'):
329 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700330 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700331 with timer.get_client('django_db_reset_queries'):
332 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000333 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000334
showard97aed502008-11-04 02:01:24 +0000335
mblighf3294cc2009-04-08 21:17:38 +0000336 def _run_cleanup(self):
337 self._periodic_cleanup.run_cleanup_maybe()
338 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000339
mbligh36768f02008-02-22 18:28:33 +0000340
showardf13a9e22009-12-18 22:54:09 +0000341 def _garbage_collection(self):
342 threshold_time = time.time() - self._seconds_between_garbage_stats
343 if threshold_time < self._last_garbage_stats_time:
344 # Don't generate these reports very often.
345 return
346
347 self._last_garbage_stats_time = time.time()
348 # Force a full level 0 collection (because we can, it doesn't hurt
349 # at this interval).
350 gc.collect()
351 logging.info('Logging garbage collector stats on tick %d.',
352 self._tick_count)
353 gc_stats._log_garbage_collector_stats()
354
355
showard170873e2009-01-07 00:22:26 +0000356 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
357 for object_id in object_ids:
358 agent_dict.setdefault(object_id, set()).add(agent)
359
360
361 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
362 for object_id in object_ids:
363 assert object_id in agent_dict
364 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700365 # If an ID has no more active agent associated, there is no need to
366 # keep it in the dictionary. Otherwise, scheduler will keep an
367 # unnecessarily big dictionary until being restarted.
368 if not agent_dict[object_id]:
369 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000370
371
showardd1195652009-12-08 22:21:02 +0000372 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700373 """
374 Creates and adds an agent to the dispatchers list.
375
376 In creating the agent we also pass on all the queue_entry_ids and
377 host_ids from the special agent task. For every agent we create, we
378 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
379 against the host_ids given to it. So theoritically, a host can have any
380 number of agents associated with it, and each of them can have any
381 special agent task, though in practice we never see > 1 agent/task per
382 host at any time.
383
384 @param agent_task: A SpecialTask for the agent to manage.
385 """
showardd1195652009-12-08 22:21:02 +0000386 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000387 self._agents.append(agent)
388 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000389 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
390 self._register_agent_for_ids(self._queue_entry_agents,
391 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000392
showard170873e2009-01-07 00:22:26 +0000393
394 def get_agents_for_entry(self, queue_entry):
395 """
396 Find agents corresponding to the specified queue_entry.
397 """
showardd3dc1992009-04-22 21:01:40 +0000398 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000399
400
401 def host_has_agent(self, host):
402 """
403 Determine if there is currently an Agent present using this host.
404 """
405 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000406
407
jadmanski0afbb632008-06-06 21:10:57 +0000408 def remove_agent(self, agent):
409 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000410 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
411 agent)
412 self._unregister_agent_for_ids(self._queue_entry_agents,
413 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000414
415
showard8cc058f2009-09-08 16:26:33 +0000416 def _host_has_scheduled_special_task(self, host):
417 return bool(models.SpecialTask.objects.filter(host__id=host.id,
418 is_active=False,
419 is_complete=False))
420
421
jadmanski0afbb632008-06-06 21:10:57 +0000422 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000423 agent_tasks = self._create_recovery_agent_tasks()
424 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000425 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000426 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000427 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000428 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000429 self._reverify_remaining_hosts()
430 # reinitialize drones after killing orphaned processes, since they can
431 # leave around files when they die
432 _drone_manager.execute_actions()
433 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000434
showard170873e2009-01-07 00:22:26 +0000435
showardd1195652009-12-08 22:21:02 +0000436 def _create_recovery_agent_tasks(self):
437 return (self._get_queue_entry_agent_tasks()
438 + self._get_special_task_agent_tasks(is_active=True))
439
440
441 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700442 """
443 Get agent tasks for all hqe in the specified states.
444
445 Loosely this translates to taking a hqe in one of the specified states,
446 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
447 through _get_agent_task_for_queue_entry. Each queue entry can only have
448 one agent task at a time, but there might be multiple queue entries in
449 the group.
450
451 @return: A list of AgentTasks.
452 """
showardd1195652009-12-08 22:21:02 +0000453 # host queue entry statuses handled directly by AgentTasks (Verifying is
454 # handled through SpecialTasks, so is not listed here)
455 statuses = (models.HostQueueEntry.Status.STARTING,
456 models.HostQueueEntry.Status.RUNNING,
457 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000458 models.HostQueueEntry.Status.PARSING,
459 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000460 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000461 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000462 where='status IN (%s)' % status_list)
Alex Miller47cd2472013-11-25 15:20:04 -0800463 stats.Gauge('scheduler.jobs_per_tick').send(
464 'running', len(queue_entries))
showardd1195652009-12-08 22:21:02 +0000465
466 agent_tasks = []
467 used_queue_entries = set()
468 for entry in queue_entries:
469 if self.get_agents_for_entry(entry):
470 # already being handled
471 continue
472 if entry in used_queue_entries:
473 # already picked up by a synchronous job
474 continue
475 agent_task = self._get_agent_task_for_queue_entry(entry)
476 agent_tasks.append(agent_task)
477 used_queue_entries.update(agent_task.queue_entries)
478 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000479
480
showardd1195652009-12-08 22:21:02 +0000481 def _get_special_task_agent_tasks(self, is_active=False):
482 special_tasks = models.SpecialTask.objects.filter(
483 is_active=is_active, is_complete=False)
484 return [self._get_agent_task_for_special_task(task)
485 for task in special_tasks]
486
487
488 def _get_agent_task_for_queue_entry(self, queue_entry):
489 """
beeps8bb1f7d2013-08-05 01:30:09 -0700490 Construct an AgentTask instance for the given active HostQueueEntry.
491
showardd1195652009-12-08 22:21:02 +0000492 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700493 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000494 """
495 task_entries = queue_entry.job.get_group_entries(queue_entry)
496 self._check_for_duplicate_host_entries(task_entries)
497
498 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
499 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000500 if queue_entry.is_hostless():
501 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000502 return QueueTask(queue_entries=task_entries)
503 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700504 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000505 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700506 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000507 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700508 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000509
Prashanth B0e960282014-05-13 19:38:28 -0700510 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800511 '_get_agent_task_for_queue_entry got entry with '
512 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000513
514
515 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000516 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
517 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000518 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000519 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000520 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000521 if using_host:
showardd1195652009-12-08 22:21:02 +0000522 self._assert_host_has_no_agent(task_entry)
523
524
525 def _assert_host_has_no_agent(self, entry):
526 """
527 @param entry: a HostQueueEntry or a SpecialTask
528 """
529 if self.host_has_agent(entry.host):
530 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700531 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000532 'While scheduling %s, host %s already has a host agent %s'
533 % (entry, entry.host, agent.task))
534
535
536 def _get_agent_task_for_special_task(self, special_task):
537 """
538 Construct an AgentTask class to run the given SpecialTask and add it
539 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700540
541 A special task is create through schedule_special_tasks, but only if
542 the host doesn't already have an agent. This happens through
543 add_agent_task. All special agent tasks are given a host on creation,
544 and a Null hqe. To create a SpecialAgentTask object, you need a
545 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
546 object contains a hqe it's passed on to the special agent task, which
547 creates a HostQueueEntry and saves it as it's queue_entry.
548
showardd1195652009-12-08 22:21:02 +0000549 @param special_task: a models.SpecialTask instance
550 @returns an AgentTask to run this SpecialTask
551 """
552 self._assert_host_has_no_agent(special_task)
553
beeps5e2bb4a2013-10-28 11:26:45 -0700554 special_agent_task_classes = (prejob_task.CleanupTask,
555 prejob_task.VerifyTask,
556 prejob_task.RepairTask,
557 prejob_task.ResetTask,
558 prejob_task.ProvisionTask)
559
showardd1195652009-12-08 22:21:02 +0000560 for agent_task_class in special_agent_task_classes:
561 if agent_task_class.TASK_TYPE == special_task.task:
562 return agent_task_class(task=special_task)
563
Prashanth B0e960282014-05-13 19:38:28 -0700564 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800565 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000566
567
568 def _register_pidfiles(self, agent_tasks):
569 for agent_task in agent_tasks:
570 agent_task.register_necessary_pidfiles()
571
572
573 def _recover_tasks(self, agent_tasks):
574 orphans = _drone_manager.get_orphaned_autoserv_processes()
575
576 for agent_task in agent_tasks:
577 agent_task.recover()
578 if agent_task.monitor and agent_task.monitor.has_process():
579 orphans.discard(agent_task.monitor.get_process())
580 self.add_agent_task(agent_task)
581
582 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000583
584
showard8cc058f2009-09-08 16:26:33 +0000585 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000586 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
587 % status):
showard0db3d432009-10-12 20:29:15 +0000588 if entry.status == status and not self.get_agents_for_entry(entry):
589 # The status can change during iteration, e.g., if job.run()
590 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000591 yield entry
592
593
showard6878e8b2009-07-20 22:37:45 +0000594 def _check_for_remaining_orphan_processes(self, orphans):
595 if not orphans:
596 return
597 subject = 'Unrecovered orphan autoserv processes remain'
598 message = '\n'.join(str(process) for process in orphans)
599 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000600
601 die_on_orphans = global_config.global_config.get_config_value(
602 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
603
604 if die_on_orphans:
605 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000606
showard170873e2009-01-07 00:22:26 +0000607
showard8cc058f2009-09-08 16:26:33 +0000608 def _recover_pending_entries(self):
609 for entry in self._get_unassigned_entries(
610 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000611 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000612 entry.on_pending()
613
614
showardb8900452009-10-12 20:31:01 +0000615 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000616 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000617 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
618 unrecovered_hqes = []
619 for queue_entry in queue_entries:
620 special_tasks = models.SpecialTask.objects.filter(
621 task__in=(models.SpecialTask.Task.CLEANUP,
622 models.SpecialTask.Task.VERIFY),
623 queue_entry__id=queue_entry.id,
624 is_complete=False)
625 if special_tasks.count() == 0:
626 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000627
showardb8900452009-10-12 20:31:01 +0000628 if unrecovered_hqes:
629 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700630 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000631 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000632 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000633
634
showard65db3932009-10-28 19:54:35 +0000635 def _schedule_special_tasks(self):
636 """
637 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700638
639 Special tasks include PreJobTasks like verify, reset and cleanup.
640 They are created through _schedule_new_jobs and associated with a hqe
641 This method translates SpecialTasks to the appropriate AgentTask and
642 adds them to the dispatchers agents list, so _handle_agents can execute
643 them.
showard65db3932009-10-28 19:54:35 +0000644 """
Prashanth B4ec98672014-05-15 10:44:54 -0700645 # When the host scheduler is responsible for acquisition we only want
646 # to run tasks with leased hosts. All hqe tasks will already have
647 # leased hosts, and we don't want to run frontend tasks till the host
648 # scheduler has vetted the assignment. Note that this doesn't include
649 # frontend tasks with hosts leased by other active hqes.
650 for task in self._job_query_manager.get_prioritized_special_tasks(
651 only_tasks_with_leased_hosts=not _inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000652 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000653 continue
showardd1195652009-12-08 22:21:02 +0000654 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000655
656
showard170873e2009-01-07 00:22:26 +0000657 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000658 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000659 # should never happen
showarded2afea2009-07-07 20:54:07 +0000660 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000661 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000662 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700663 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000664 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000665
666
jadmanski0afbb632008-06-06 21:10:57 +0000667 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000668 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700669 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000670 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000671 if self.host_has_agent(host):
672 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000673 continue
showard8cc058f2009-09-08 16:26:33 +0000674 if self._host_has_scheduled_special_task(host):
675 # host will have a special task scheduled on the next cycle
676 continue
showard170873e2009-01-07 00:22:26 +0000677 if print_message:
showardb18134f2009-03-20 20:52:18 +0000678 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000679 models.SpecialTask.objects.create(
680 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000681 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000682
683
jadmanski0afbb632008-06-06 21:10:57 +0000684 def _recover_hosts(self):
685 # recover "Repair Failed" hosts
686 message = 'Reverifying dead host %s'
687 self._reverify_hosts_where("status = 'Repair Failed'",
688 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000689
690
showard89f84db2009-03-12 20:39:13 +0000691 def _refresh_pending_queue_entries(self):
692 """
693 Lookup the pending HostQueueEntries and call our HostScheduler
694 refresh() method given that list. Return the list.
695
696 @returns A list of pending HostQueueEntries sorted in priority order.
697 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700698 queue_entries = self._job_query_manager.get_pending_queue_entries(
699 only_hostless=not _inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000700 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000701 return []
showard89f84db2009-03-12 20:39:13 +0000702 return queue_entries
703
704
showarda9545c02009-12-18 22:44:26 +0000705 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800706 """Schedule a hostless (suite) job.
707
708 @param queue_entry: The queue_entry representing the hostless job.
709 """
showarda9545c02009-12-18 22:44:26 +0000710 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000711 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000712
713
beepscc9fc702013-12-02 12:45:38 -0800714 def _schedule_host_job(self, host, queue_entry):
715 """Schedules a job on the given host.
716
717 1. Assign the host to the hqe, if it isn't already assigned.
718 2. Create a SpecialAgentTask for the hqe.
719 3. Activate the hqe.
720
721 @param queue_entry: The job to schedule.
722 @param host: The host to schedule the job on.
723 """
724 if self.host_has_agent(host):
725 host_agent_task = list(self._host_agents.get(host.id))[0].task
726 subject = 'Host with agents assigned to an HQE'
727 message = ('HQE: %s assigned host %s, but the host has '
728 'agent: %s for queue_entry %s. The HQE '
beepsf7d35162014-02-13 16:42:13 -0800729 'will have to try and acquire a host next tick ' %
beepscc9fc702013-12-02 12:45:38 -0800730 (queue_entry, host.hostname, host_agent_task,
731 host_agent_task.queue_entry))
732 email_manager.manager.enqueue_notify_email(subject, message)
beepscc9fc702013-12-02 12:45:38 -0800733 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700734 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800735
736
showard89f84db2009-03-12 20:39:13 +0000737 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700738 """
739 Find any new HQEs and call schedule_pre_job_tasks for it.
740
741 This involves setting the status of the HQE and creating a row in the
742 db corresponding the the special task, through
743 scheduler_models._queue_special_task. The new db row is then added as
744 an agent to the dispatcher through _schedule_special_tasks and
745 scheduled for execution on the drone through _handle_agents.
746 """
showard89f84db2009-03-12 20:39:13 +0000747 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000748
beepscc9fc702013-12-02 12:45:38 -0800749 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700750 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700751 new_jobs_with_hosts = 0
752 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800753 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700754 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000755
beepscc9fc702013-12-02 12:45:38 -0800756 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000757 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000758 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700759 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000760 else:
beepscc9fc702013-12-02 12:45:38 -0800761 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700762 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700763
beepsb255fc52013-10-13 23:28:54 -0700764 stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800765 if not host_jobs:
766 return
Prashanth Bf66d51b2014-05-06 12:42:25 -0700767 if not _inline_host_acquisition:
768 message = ('Found %s jobs that need hosts though '
769 '_inline_host_acquisition=%s. Will acquire hosts.' %
770 ([str(job) for job in host_jobs],
771 _inline_host_acquisition))
772 email_manager.manager.enqueue_notify_email(
773 'Processing unexpected host acquisition requests', message)
774 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
775 for host_assignment in jobs_with_hosts:
776 self._schedule_host_job(host_assignment.host, host_assignment.job)
777 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800778
beepsb255fc52013-10-13 23:28:54 -0700779 stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
780 stats.Gauge(key).send('new_jobs_without_hosts',
781 new_jobs_need_hosts - new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000782
783
showard8cc058f2009-09-08 16:26:33 +0000784 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700785 """
786 Adds agents to the dispatcher.
787
788 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
789 QueueTask for example, will have a job with a control file, and
790 the agent will have methods that poll, abort and check if the queue
791 task is finished. The dispatcher runs the agent_task, as well as
792 other agents in it's _agents member, through _handle_agents, by
793 calling the Agents tick().
794
795 This method creates an agent for each HQE in one of (starting, running,
796 gathering, parsing, archiving) states, and adds it to the dispatcher so
797 it is handled by _handle_agents.
798 """
showardd1195652009-12-08 22:21:02 +0000799 for agent_task in self._get_queue_entry_agent_tasks():
800 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000801
802
803 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000804 for entry in scheduler_models.HostQueueEntry.fetch(
805 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000806 task = entry.job.schedule_delayed_callback_task(entry)
807 if task:
showardd1195652009-12-08 22:21:02 +0000808 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000809
810
jadmanski0afbb632008-06-06 21:10:57 +0000811 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700812 """
813 Looks through the afe_host_queue_entries for an aborted entry.
814
815 The aborted bit is set on an HQE in many ways, the most common
816 being when a user requests an abort through the frontend, which
817 results in an rpc from the afe to abort_host_queue_entries.
818 """
jamesrene7c65cb2010-06-08 20:38:10 +0000819 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000820 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700821 where='aborted=1 and complete=0'):
showardf4a2e502009-07-28 20:06:39 +0000822 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800823
824 # The task would have started off with both is_complete and
825 # is_active = False. Aborted tasks are neither active nor complete.
826 # For all currently active tasks this will happen through the agent,
827 # but we need to manually update the special tasks that haven't
828 # started yet, because they don't have agents.
829 models.SpecialTask.objects.filter(is_active=False,
830 queue_entry_id=entry.id).update(is_complete=True)
831
showardd3dc1992009-04-22 21:01:40 +0000832 for agent in self.get_agents_for_entry(entry):
833 agent.abort()
834 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000835 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700836 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000837 for job in jobs_to_stop:
838 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000839
840
beeps8bb1f7d2013-08-05 01:30:09 -0700841 def _find_aborted_special_tasks(self):
842 """
843 Find SpecialTasks that have been marked for abortion.
844
845 Poll the database looking for SpecialTasks that are active
846 and have been marked for abortion, then abort them.
847 """
848
849 # The completed and active bits are very important when it comes
850 # to scheduler correctness. The active bit is set through the prolog
851 # of a special task, and reset through the cleanup method of the
852 # SpecialAgentTask. The cleanup is called both through the abort and
853 # epilog. The complete bit is set in several places, and in general
854 # a hanging job will have is_active=1 is_complete=0, while a special
855 # task which completed will have is_active=0 is_complete=1. To check
856 # aborts we directly check active because the complete bit is set in
857 # several places, including the epilog of agent tasks.
858 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
859 is_aborted=True)
860 for task in aborted_tasks:
861 # There are 2 ways to get the agent associated with a task,
862 # through the host and through the hqe. A special task
863 # always needs a host, but doesn't always need a hqe.
864 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700865 if isinstance(agent.task, agent_task.SpecialAgentTask):
beeps8bb1f7d2013-08-05 01:30:09 -0700866
867 # The epilog preforms critical actions such as
868 # queueing the next SpecialTask, requeuing the
869 # hqe etc, however it doesn't actually kill the
870 # monitor process and set the 'done' bit. Epilogs
871 # assume that the job failed, and that the monitor
872 # process has already written an exit code. The
873 # done bit is a necessary condition for
874 # _handle_agents to schedule any more special
875 # tasks against the host, and it must be set
876 # in addition to is_active, is_complete and success.
877 agent.task.epilog()
878 agent.task.abort()
879
880
showard324bf812009-01-20 23:23:38 +0000881 def _can_start_agent(self, agent, num_started_this_cycle,
882 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000883 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000884 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000885 return True
886 # don't allow any nonzero-process agents to run after we've reached a
887 # limit (this avoids starvation of many-process agents)
888 if have_reached_limit:
889 return False
890 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000891 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000892 agent.task.owner_username,
893 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000894 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000895 return False
896 # if a single agent exceeds the per-cycle throttling, still allow it to
897 # run when it's the first agent in the cycle
898 if num_started_this_cycle == 0:
899 return True
900 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000901 if (num_started_this_cycle + agent.task.num_processes >
902 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000903 return False
904 return True
905
906
jadmanski0afbb632008-06-06 21:10:57 +0000907 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700908 """
909 Handles agents of the dispatcher.
910
911 Appropriate Agents are added to the dispatcher through
912 _schedule_running_host_queue_entries. These agents each
913 have a task. This method runs the agents task through
914 agent.tick() leading to:
915 agent.start
916 prolog -> AgentTasks prolog
917 For each queue entry:
918 sets host status/status to Running
919 set started_on in afe_host_queue_entries
920 run -> AgentTasks run
921 Creates PidfileRunMonitor
922 Queues the autoserv command line for this AgentTask
923 via the drone manager. These commands are executed
924 through the drone managers execute actions.
925 poll -> AgentTasks/BaseAgentTask poll
926 checks the monitors exit_code.
927 Executes epilog if task is finished.
928 Executes AgentTasks _finish_task
929 finish_task is usually responsible for setting the status
930 of the HQE/host, and updating it's active and complete fileds.
931
932 agent.is_done
933 Removed the agent from the dispatchers _agents queue.
934 Is_done checks the finished bit on the agent, that is
935 set based on the Agents task. During the agents poll
936 we check to see if the monitor process has exited in
937 it's finish method, and set the success member of the
938 task based on this exit code.
939 """
jadmanski0afbb632008-06-06 21:10:57 +0000940 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000941 have_reached_limit = False
942 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700943 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000944 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700945 self._log_extra_msg('Processing Agent with Host Ids: %s and '
946 'queue_entry ids:%s' % (agent.host_ids,
947 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000948 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000949 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000950 have_reached_limit):
951 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700952 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000953 continue
showardd1195652009-12-08 22:21:02 +0000954 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700955 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000956 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700957 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000958 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700959 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000960 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700961 logging.info('%d running processes. %d added this cycle.',
962 _drone_manager.total_running_processes(),
963 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +0000964
965
showard29f7cd22009-04-29 21:16:24 +0000966 def _process_recurring_runs(self):
967 recurring_runs = models.RecurringRun.objects.filter(
968 start_date__lte=datetime.datetime.now())
969 for rrun in recurring_runs:
970 # Create job from template
971 job = rrun.job
972 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000973 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000974
975 host_objects = info['hosts']
976 one_time_hosts = info['one_time_hosts']
977 metahost_objects = info['meta_hosts']
978 dependencies = info['dependencies']
979 atomic_group = info['atomic_group']
980
981 for host in one_time_hosts or []:
982 this_host = models.Host.create_one_time_host(host.hostname)
983 host_objects.append(this_host)
984
985 try:
986 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000987 options=options,
showard29f7cd22009-04-29 21:16:24 +0000988 host_objects=host_objects,
989 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +0000990 atomic_group=atomic_group)
991
992 except Exception, ex:
993 logging.exception(ex)
994 #TODO send email
995
996 if rrun.loop_count == 1:
997 rrun.delete()
998 else:
999 if rrun.loop_count != 0: # if not infinite loop
1000 # calculate new start_date
1001 difference = datetime.timedelta(seconds=rrun.loop_period)
1002 rrun.start_date = rrun.start_date + difference
1003 rrun.loop_count -= 1
1004 rrun.save()
1005
1006
Simran Basia858a232012-08-21 11:04:37 -07001007SiteDispatcher = utils.import_site_class(
1008 __file__, 'autotest_lib.scheduler.site_monitor_db',
1009 'SiteDispatcher', BaseDispatcher)
1010
1011class Dispatcher(SiteDispatcher):
1012 pass
1013
1014
mbligh36768f02008-02-22 18:28:33 +00001015class Agent(object):
showard77182562009-06-10 00:16:05 +00001016 """
Alex Miller47715eb2013-07-24 03:34:01 -07001017 An agent for use by the Dispatcher class to perform a task. An agent wraps
1018 around an AgentTask mainly to associate the AgentTask with the queue_entry
1019 and host ids.
showard77182562009-06-10 00:16:05 +00001020
1021 The following methods are required on all task objects:
1022 poll() - Called periodically to let the task check its status and
1023 update its internal state. If the task succeeded.
1024 is_done() - Returns True if the task is finished.
1025 abort() - Called when an abort has been requested. The task must
1026 set its aborted attribute to True if it actually aborted.
1027
1028 The following attributes are required on all task objects:
1029 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001030 success - bool, True if this task succeeded.
1031 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1032 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001033 """
1034
1035
showard418785b2009-11-23 20:19:59 +00001036 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001037 """
Alex Miller47715eb2013-07-24 03:34:01 -07001038 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001039 """
showard8cc058f2009-09-08 16:26:33 +00001040 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001041
showard77182562009-06-10 00:16:05 +00001042 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001043 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001044
showard8cc058f2009-09-08 16:26:33 +00001045 self.queue_entry_ids = task.queue_entry_ids
1046 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001047
showard8cc058f2009-09-08 16:26:33 +00001048 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001049 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001050
1051
jadmanski0afbb632008-06-06 21:10:57 +00001052 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001053 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001054 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001055 self.task.poll()
1056 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001057 self.finished = True
showardec113162008-05-08 00:52:49 +00001058
1059
jadmanski0afbb632008-06-06 21:10:57 +00001060 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001061 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001062
1063
showardd3dc1992009-04-22 21:01:40 +00001064 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001065 if self.task:
1066 self.task.abort()
1067 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001068 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001069 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001070
showardd3dc1992009-04-22 21:01:40 +00001071
beeps5e2bb4a2013-10-28 11:26:45 -07001072class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001073 """
1074 Common functionality for QueueTask and HostlessQueueTask
1075 """
1076 def __init__(self, queue_entries):
1077 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001078 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001079 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001080
1081
showard73ec0442009-02-07 02:05:20 +00001082 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001083 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001084
1085
jamesrenc44ae992010-02-19 00:12:54 +00001086 def _write_control_file(self, execution_path):
1087 control_path = _drone_manager.attach_file_to_execution(
1088 execution_path, self.job.control_file)
1089 return control_path
1090
1091
Aviv Keshet308e7362013-05-21 14:43:16 -07001092 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001093 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001094 execution_path = self.queue_entries[0].execution_path()
1095 control_path = self._write_control_file(execution_path)
1096 hostnames = ','.join(entry.host.hostname
1097 for entry in self.queue_entries
1098 if not entry.is_hostless())
1099
1100 execution_tag = self.queue_entries[0].execution_tag()
1101 params = _autoserv_command_line(
1102 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001103 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001104 _drone_manager.absolute_path(control_path)],
1105 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001106 if self.job.is_image_update_job():
1107 params += ['--image', self.job.update_image_path]
1108
jamesrenc44ae992010-02-19 00:12:54 +00001109 return params
showardd1195652009-12-08 22:21:02 +00001110
1111
1112 @property
1113 def num_processes(self):
1114 return len(self.queue_entries)
1115
1116
1117 @property
1118 def owner_username(self):
1119 return self.job.owner
1120
1121
1122 def _working_directory(self):
1123 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001124
1125
jadmanski0afbb632008-06-06 21:10:57 +00001126 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001127 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001128 keyval_dict = self.job.keyval_dict()
1129 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001130 group_name = self.queue_entries[0].get_group_name()
1131 if group_name:
1132 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001133 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001134 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001135 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001136 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001137
1138
showard35162b02009-03-03 02:17:30 +00001139 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001140 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001141 _drone_manager.write_lines_to_file(error_file_path,
1142 [_LOST_PROCESS_ERROR])
1143
1144
showardd3dc1992009-04-22 21:01:40 +00001145 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001146 if not self.monitor:
1147 return
1148
showardd9205182009-04-27 20:09:55 +00001149 self._write_job_finished()
1150
showard35162b02009-03-03 02:17:30 +00001151 if self.monitor.lost_process:
1152 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001153
jadmanskif7fa2cc2008-10-01 14:13:23 +00001154
showardcbd74612008-11-19 21:42:02 +00001155 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001156 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001157 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001158 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001159 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001160
1161
jadmanskif7fa2cc2008-10-01 14:13:23 +00001162 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001163 if not self.monitor or not self.monitor.has_process():
1164 return
1165
jadmanskif7fa2cc2008-10-01 14:13:23 +00001166 # build up sets of all the aborted_by and aborted_on values
1167 aborted_by, aborted_on = set(), set()
1168 for queue_entry in self.queue_entries:
1169 if queue_entry.aborted_by:
1170 aborted_by.add(queue_entry.aborted_by)
1171 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1172 aborted_on.add(t)
1173
1174 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001175 # TODO(showard): this conditional is now obsolete, we just need to leave
1176 # it in temporarily for backwards compatibility over upgrades. delete
1177 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001178 assert len(aborted_by) <= 1
1179 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001180 aborted_by_value = aborted_by.pop()
1181 aborted_on_value = max(aborted_on)
1182 else:
1183 aborted_by_value = 'autotest_system'
1184 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001185
showarda0382352009-02-11 23:36:43 +00001186 self._write_keyval_after_job("aborted_by", aborted_by_value)
1187 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001188
showardcbd74612008-11-19 21:42:02 +00001189 aborted_on_string = str(datetime.datetime.fromtimestamp(
1190 aborted_on_value))
1191 self._write_status_comment('Job aborted by %s on %s' %
1192 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001193
1194
jadmanski0afbb632008-06-06 21:10:57 +00001195 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001196 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001197 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001198 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001199
1200
jadmanski0afbb632008-06-06 21:10:57 +00001201 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001202 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001203 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001204
1205
1206class QueueTask(AbstractQueueTask):
1207 def __init__(self, queue_entries):
1208 super(QueueTask, self).__init__(queue_entries)
1209 self._set_ids(queue_entries=queue_entries)
1210
1211
1212 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001213 self._check_queue_entry_statuses(
1214 self.queue_entries,
1215 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1216 models.HostQueueEntry.Status.RUNNING),
1217 allowed_host_statuses=(models.Host.Status.PENDING,
1218 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001219
1220 super(QueueTask, self).prolog()
1221
1222 for queue_entry in self.queue_entries:
1223 self._write_host_keyvals(queue_entry.host)
1224 queue_entry.host.set_status(models.Host.Status.RUNNING)
1225 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001226
1227
1228 def _finish_task(self):
1229 super(QueueTask, self)._finish_task()
1230
1231 for queue_entry in self.queue_entries:
1232 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001233 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001234
1235
Alex Miller9f01d5d2013-08-08 02:26:01 -07001236 def _command_line(self):
1237 invocation = super(QueueTask, self)._command_line()
1238 return invocation + ['--verify_job_repo_url']
1239
1240
Dan Shi1a189052013-10-28 14:41:35 -07001241class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001242 def __init__(self, queue_entry):
1243 super(HostlessQueueTask, self).__init__([queue_entry])
1244 self.queue_entry_ids = [queue_entry.id]
1245
1246
1247 def prolog(self):
1248 self.queue_entries[0].update_field('execution_subdir', 'hostless')
1249 super(HostlessQueueTask, self).prolog()
1250
1251
mbligh4608b002010-01-05 18:22:35 +00001252 def _finish_task(self):
1253 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001254
1255 # When a job is added to database, its initial status is always
1256 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1257 # status, check if any of them can be started. If scheduler hits some
1258 # limit, e.g., max_hostless_jobs_per_drone, max_jobs_started_per_cycle,
1259 # scheduler will leave these jobs in Starting status. Otherwise, the
1260 # jobs' status will be changed to Running, and an autoserv process will
1261 # be started in drone for each of these jobs.
1262 # If the entry is still in status Starting, the process has not started
1263 # yet. Therefore, there is no need to parse and collect log. Without
1264 # this check, exception will be raised by scheduler as execution_subdir
1265 # for this queue entry does not have a value yet.
1266 hqe = self.queue_entries[0]
1267 if hqe.status != models.HostQueueEntry.Status.STARTING:
1268 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001269
1270
mbligh36768f02008-02-22 18:28:33 +00001271if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001272 main()