blob: da6d4da94b7732e6cffcecda524cbb051d200f2e [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
Aviv Keshet225bdfe2013-03-05 10:10:08 -08008import datetime, optparse, os, signal
beeps5e2bb4a2013-10-28 11:26:45 -07009import sys, time
Aviv Keshet225bdfe2013-03-05 10:10:08 -080010import logging, gc
showard402934a2009-12-21 22:20:47 +000011
Alex Miller05d7b4c2013-03-04 07:49:38 -080012import common
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000014
15import django.db
16
Prashanth B0e960282014-05-13 19:38:28 -070017from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070018from autotest_lib.client.common_lib import utils
Michael Liangda8c60a2014-06-03 13:24:51 -070019from autotest_lib.client.common_lib.cros.graphite import stats
Prashanth B0e960282014-05-13 19:38:28 -070020from autotest_lib.frontend.afe import models, rpc_utils
beeps5e2bb4a2013-10-28 11:26:45 -070021from autotest_lib.scheduler import agent_task, drone_manager, drones
22from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
23from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070024from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070025from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070026from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000027from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080028from autotest_lib.scheduler import status_server, scheduler_config
Prashanth Bf66d51b2014-05-06 12:42:25 -070029from autotest_lib.scheduler import scheduler_lib
Aviv Keshet308e7362013-05-21 14:43:16 -070030from autotest_lib.server import autoserv_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080031
showard549afad2009-08-20 23:33:36 +000032BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
33PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000034
mbligh36768f02008-02-22 18:28:33 +000035RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000036AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
37
38if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000039 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000040AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
41AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
42
43if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000044 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000045
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
Prashanth B0e960282014-05-13 19:38:28 -070053_db_manager = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070055
56# These 2 globals are replaced for testing
57_autoserv_directory = autoserv_utils.autoserv_directory
58_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000059_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000060_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070061_inline_host_acquisition = global_config.global_config.get_config_value(
62 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
63 default=True)
64
mbligh36768f02008-02-22 18:28:33 +000065
Eric Lie0493a42010-11-15 13:05:43 -080066def _parser_path_default(install_dir):
67 return os.path.join(install_dir, 'tko', 'parse')
68_parser_path_func = utils.import_site_function(
69 __file__, 'autotest_lib.scheduler.site_monitor_db',
70 'parser_path', _parser_path_default)
71_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
72
mbligh36768f02008-02-22 18:28:33 +000073
mbligh83c1e9e2009-05-01 23:10:41 +000074def _site_init_monitor_db_dummy():
75 return {}
76
77
jamesren76fcf192010-04-21 20:39:50 +000078def _verify_default_drone_set_exists():
79 if (models.DroneSet.drone_sets_enabled() and
80 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070081 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080082 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000083
84
85def _sanity_check():
86 """Make sure the configs are consistent before starting the scheduler"""
87 _verify_default_drone_set_exists()
88
89
mbligh36768f02008-02-22 18:28:33 +000090def main():
showard27f33872009-04-07 18:20:53 +000091 try:
showard549afad2009-08-20 23:33:36 +000092 try:
93 main_without_exception_handling()
94 except SystemExit:
95 raise
96 except:
97 logging.exception('Exception escaping in monitor_db')
98 raise
99 finally:
100 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000101
102
103def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700104 scheduler_lib.setup_logging(
105 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
106 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000107 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000108 parser = optparse.OptionParser(usage)
109 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
110 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000111 parser.add_option('--test', help='Indicate that scheduler is under ' +
112 'test and should use dummy autoserv and no parsing',
113 action='store_true')
114 (options, args) = parser.parse_args()
115 if len(args) != 1:
116 parser.print_usage()
117 return
mbligh36768f02008-02-22 18:28:33 +0000118
showard5613c662009-06-08 23:30:33 +0000119 scheduler_enabled = global_config.global_config.get_config_value(
120 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
121
122 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800123 logging.error("Scheduler not enabled, set enable_scheduler to true in "
124 "the global_config's SCHEDULER section to enable it. "
125 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000126 sys.exit(1)
127
jadmanski0afbb632008-06-06 21:10:57 +0000128 global RESULTS_DIR
129 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000130
mbligh83c1e9e2009-05-01 23:10:41 +0000131 site_init = utils.import_site_function(__file__,
132 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
133 _site_init_monitor_db_dummy)
134 site_init()
135
showardcca334f2009-03-12 20:38:34 +0000136 # Change the cwd while running to avoid issues incase we were launched from
137 # somewhere odd (such as a random NFS home directory of the person running
138 # sudo to launch us as the appropriate user).
139 os.chdir(RESULTS_DIR)
140
jamesrenc7d387e2010-08-10 21:48:30 +0000141 # This is helpful for debugging why stuff a scheduler launches is
142 # misbehaving.
143 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000144
jadmanski0afbb632008-06-06 21:10:57 +0000145 if options.test:
146 global _autoserv_path
147 _autoserv_path = 'autoserv_dummy'
148 global _testing_mode
149 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000150
jamesrenc44ae992010-02-19 00:12:54 +0000151 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000152 server.start()
153
jadmanski0afbb632008-06-06 21:10:57 +0000154 try:
jamesrenc44ae992010-02-19 00:12:54 +0000155 initialize()
showardc5afc462009-01-13 00:09:39 +0000156 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000157 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000158
Eric Lia82dc352011-02-23 13:15:52 -0800159 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000160 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000161 time.sleep(scheduler_config.config.tick_pause_sec)
Prashanth B4ec98672014-05-15 10:44:54 -0700162 except Exception:
showard170873e2009-01-07 00:22:26 +0000163 email_manager.manager.log_stacktrace(
164 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000165
showard170873e2009-01-07 00:22:26 +0000166 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000167 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000168 _drone_manager.shutdown()
Prashanth B0e960282014-05-13 19:38:28 -0700169 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000170
171
Prashanth B4ec98672014-05-15 10:44:54 -0700172def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000173 global _shutdown
174 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000175 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000176
177
jamesrenc44ae992010-02-19 00:12:54 +0000178def initialize():
showardb18134f2009-03-20 20:52:18 +0000179 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
180 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000181
showard8de37132009-08-31 18:33:08 +0000182 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000183 logging.critical("monitor_db already running, aborting!")
184 sys.exit(1)
185 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000186
showardb1e51872008-10-07 11:08:18 +0000187 if _testing_mode:
188 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700189 scheduler_lib.DB_CONFIG_SECTION, 'database',
190 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000191
jadmanski0afbb632008-06-06 21:10:57 +0000192 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700193 global _db_manager
194 _db_manager = scheduler_lib.ConnectionManager()
showardb18134f2009-03-20 20:52:18 +0000195 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700196 signal.signal(signal.SIGINT, handle_signal)
197 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000198
jamesrenc44ae992010-02-19 00:12:54 +0000199 initialize_globals()
200 scheduler_models.initialize()
201
showardd1ee1dd2009-01-07 21:33:08 +0000202 drones = global_config.global_config.get_config_value(
203 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
204 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000205 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000206 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000207 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
208
showardb18134f2009-03-20 20:52:18 +0000209 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000210
211
jamesrenc44ae992010-02-19 00:12:54 +0000212def initialize_globals():
213 global _drone_manager
214 _drone_manager = drone_manager.instance()
215
216
showarded2afea2009-07-07 20:54:07 +0000217def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
218 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000219 """
220 @returns The autoserv command line as a list of executable + parameters.
221
222 @param machines - string - A machine or comma separated list of machines
223 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000224 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700225 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
226 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000227 @param queue_entry - A HostQueueEntry object - If supplied and no Job
228 object was supplied, this will be used to lookup the Job object.
229 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700230 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
231 machines, results_directory=drone_manager.WORKING_DIRECTORY,
232 extra_args=extra_args, job=job, queue_entry=queue_entry,
233 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000234
235
Simran Basia858a232012-08-21 11:04:37 -0700236class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800237
238
jadmanski0afbb632008-06-06 21:10:57 +0000239 def __init__(self):
240 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000241 self._last_clean_time = time.time()
mblighf3294cc2009-04-08 21:17:38 +0000242 user_cleanup_time = scheduler_config.config.clean_interval
243 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Prashanth B0e960282014-05-13 19:38:28 -0700244 _db_manager.get_connection(), user_cleanup_time)
245 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
246 _db_manager.get_connection())
showard170873e2009-01-07 00:22:26 +0000247 self._host_agents = {}
248 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000249 self._tick_count = 0
250 self._last_garbage_stats_time = time.time()
251 self._seconds_between_garbage_stats = 60 * (
252 global_config.global_config.get_config_value(
253 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700254 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700255 self._tick_debug = global_config.global_config.get_config_value(
256 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
257 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700258 self._extra_debugging = global_config.global_config.get_config_value(
259 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
260 default=False)
mbligh36768f02008-02-22 18:28:33 +0000261
Prashanth Bf66d51b2014-05-06 12:42:25 -0700262 # If _inline_host_acquisition is set the scheduler will acquire and
263 # release hosts against jobs inline, with the tick. Otherwise the
264 # scheduler will only focus on jobs that already have hosts, and
265 # will not explicitly unlease a host when a job finishes using it.
266 self._job_query_manager = query_managers.AFEJobQueryManager()
267 self._host_scheduler = (host_scheduler.BaseHostScheduler()
268 if _inline_host_acquisition else
269 host_scheduler.DummyHostScheduler())
270
mbligh36768f02008-02-22 18:28:33 +0000271
showard915958d2009-04-22 21:00:58 +0000272 def initialize(self, recover_hosts=True):
273 self._periodic_cleanup.initialize()
274 self._24hr_upkeep.initialize()
275
jadmanski0afbb632008-06-06 21:10:57 +0000276 # always recover processes
277 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000278
jadmanski0afbb632008-06-06 21:10:57 +0000279 if recover_hosts:
280 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000281
282
Simran Basi0ec94dd2012-08-28 09:50:10 -0700283 def _log_tick_msg(self, msg):
284 if self._tick_debug:
285 logging.debug(msg)
286
287
Simran Basidef92872012-09-20 13:34:34 -0700288 def _log_extra_msg(self, msg):
289 if self._extra_debugging:
290 logging.debug(msg)
291
292
jadmanski0afbb632008-06-06 21:10:57 +0000293 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700294 """
295 This is an altered version of tick() where we keep track of when each
296 major step begins so we can try to figure out where we are using most
297 of the tick time.
298 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700299 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700300 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000301 self._garbage_collection()
Prashanth B340fd1e2014-06-22 12:44:10 -0700302 self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
303 _drone_manager.trigger_refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700304 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000305 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700306 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000307 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700308 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000309 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700310 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000311 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700312 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000313 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700314 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000315 self._schedule_new_jobs()
Prashanth B340fd1e2014-06-22 12:44:10 -0700316 self._log_tick_msg('Calling _drone_manager.sync_refresh().')
317 _drone_manager.sync_refresh()
Prashanth B67548092014-07-11 18:46:01 -0700318 self._log_tick_msg('Calling _find_aborting().')
319 self._find_aborting()
320 self._log_tick_msg('Calling _find_aborted_special_tasks().')
321 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700322 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000323 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700324 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000325 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700326 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000327 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700328 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700329 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700330 with timer.get_client('email_manager_send_queued_emails'):
331 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700332 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700333 with timer.get_client('django_db_reset_queries'):
334 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000335 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000336
showard97aed502008-11-04 02:01:24 +0000337
mblighf3294cc2009-04-08 21:17:38 +0000338 def _run_cleanup(self):
339 self._periodic_cleanup.run_cleanup_maybe()
340 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000341
mbligh36768f02008-02-22 18:28:33 +0000342
showardf13a9e22009-12-18 22:54:09 +0000343 def _garbage_collection(self):
344 threshold_time = time.time() - self._seconds_between_garbage_stats
345 if threshold_time < self._last_garbage_stats_time:
346 # Don't generate these reports very often.
347 return
348
349 self._last_garbage_stats_time = time.time()
350 # Force a full level 0 collection (because we can, it doesn't hurt
351 # at this interval).
352 gc.collect()
353 logging.info('Logging garbage collector stats on tick %d.',
354 self._tick_count)
355 gc_stats._log_garbage_collector_stats()
356
357
showard170873e2009-01-07 00:22:26 +0000358 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
359 for object_id in object_ids:
360 agent_dict.setdefault(object_id, set()).add(agent)
361
362
363 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
364 for object_id in object_ids:
365 assert object_id in agent_dict
366 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700367 # If an ID has no more active agent associated, there is no need to
368 # keep it in the dictionary. Otherwise, scheduler will keep an
369 # unnecessarily big dictionary until being restarted.
370 if not agent_dict[object_id]:
371 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000372
373
showardd1195652009-12-08 22:21:02 +0000374 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700375 """
376 Creates and adds an agent to the dispatchers list.
377
378 In creating the agent we also pass on all the queue_entry_ids and
379 host_ids from the special agent task. For every agent we create, we
380 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
381 against the host_ids given to it. So theoritically, a host can have any
382 number of agents associated with it, and each of them can have any
383 special agent task, though in practice we never see > 1 agent/task per
384 host at any time.
385
386 @param agent_task: A SpecialTask for the agent to manage.
387 """
showardd1195652009-12-08 22:21:02 +0000388 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000389 self._agents.append(agent)
390 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000391 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
392 self._register_agent_for_ids(self._queue_entry_agents,
393 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000394
showard170873e2009-01-07 00:22:26 +0000395
396 def get_agents_for_entry(self, queue_entry):
397 """
398 Find agents corresponding to the specified queue_entry.
399 """
showardd3dc1992009-04-22 21:01:40 +0000400 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000401
402
403 def host_has_agent(self, host):
404 """
405 Determine if there is currently an Agent present using this host.
406 """
407 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000408
409
jadmanski0afbb632008-06-06 21:10:57 +0000410 def remove_agent(self, agent):
411 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000412 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
413 agent)
414 self._unregister_agent_for_ids(self._queue_entry_agents,
415 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000416
417
showard8cc058f2009-09-08 16:26:33 +0000418 def _host_has_scheduled_special_task(self, host):
419 return bool(models.SpecialTask.objects.filter(host__id=host.id,
420 is_active=False,
421 is_complete=False))
422
423
jadmanski0afbb632008-06-06 21:10:57 +0000424 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000425 agent_tasks = self._create_recovery_agent_tasks()
426 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000427 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000428 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000429 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000430 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000431 self._reverify_remaining_hosts()
432 # reinitialize drones after killing orphaned processes, since they can
433 # leave around files when they die
434 _drone_manager.execute_actions()
435 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000436
showard170873e2009-01-07 00:22:26 +0000437
showardd1195652009-12-08 22:21:02 +0000438 def _create_recovery_agent_tasks(self):
439 return (self._get_queue_entry_agent_tasks()
440 + self._get_special_task_agent_tasks(is_active=True))
441
442
443 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700444 """
445 Get agent tasks for all hqe in the specified states.
446
447 Loosely this translates to taking a hqe in one of the specified states,
448 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
449 through _get_agent_task_for_queue_entry. Each queue entry can only have
450 one agent task at a time, but there might be multiple queue entries in
451 the group.
452
453 @return: A list of AgentTasks.
454 """
showardd1195652009-12-08 22:21:02 +0000455 # host queue entry statuses handled directly by AgentTasks (Verifying is
456 # handled through SpecialTasks, so is not listed here)
457 statuses = (models.HostQueueEntry.Status.STARTING,
458 models.HostQueueEntry.Status.RUNNING,
459 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000460 models.HostQueueEntry.Status.PARSING,
461 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000462 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000463 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000464 where='status IN (%s)' % status_list)
Alex Miller47cd2472013-11-25 15:20:04 -0800465 stats.Gauge('scheduler.jobs_per_tick').send(
466 'running', len(queue_entries))
showardd1195652009-12-08 22:21:02 +0000467
468 agent_tasks = []
469 used_queue_entries = set()
470 for entry in queue_entries:
471 if self.get_agents_for_entry(entry):
472 # already being handled
473 continue
474 if entry in used_queue_entries:
475 # already picked up by a synchronous job
476 continue
477 agent_task = self._get_agent_task_for_queue_entry(entry)
478 agent_tasks.append(agent_task)
479 used_queue_entries.update(agent_task.queue_entries)
480 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000481
482
showardd1195652009-12-08 22:21:02 +0000483 def _get_special_task_agent_tasks(self, is_active=False):
484 special_tasks = models.SpecialTask.objects.filter(
485 is_active=is_active, is_complete=False)
486 return [self._get_agent_task_for_special_task(task)
487 for task in special_tasks]
488
489
490 def _get_agent_task_for_queue_entry(self, queue_entry):
491 """
beeps8bb1f7d2013-08-05 01:30:09 -0700492 Construct an AgentTask instance for the given active HostQueueEntry.
493
showardd1195652009-12-08 22:21:02 +0000494 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700495 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000496 """
497 task_entries = queue_entry.job.get_group_entries(queue_entry)
498 self._check_for_duplicate_host_entries(task_entries)
499
500 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
501 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000502 if queue_entry.is_hostless():
503 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000504 return QueueTask(queue_entries=task_entries)
505 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700506 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000507 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700508 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000509 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700510 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000511
Prashanth B0e960282014-05-13 19:38:28 -0700512 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800513 '_get_agent_task_for_queue_entry got entry with '
514 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000515
516
517 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000518 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
519 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000520 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000521 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000522 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000523 if using_host:
showardd1195652009-12-08 22:21:02 +0000524 self._assert_host_has_no_agent(task_entry)
525
526
527 def _assert_host_has_no_agent(self, entry):
528 """
529 @param entry: a HostQueueEntry or a SpecialTask
530 """
531 if self.host_has_agent(entry.host):
532 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700533 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000534 'While scheduling %s, host %s already has a host agent %s'
535 % (entry, entry.host, agent.task))
536
537
538 def _get_agent_task_for_special_task(self, special_task):
539 """
540 Construct an AgentTask class to run the given SpecialTask and add it
541 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700542
543 A special task is create through schedule_special_tasks, but only if
544 the host doesn't already have an agent. This happens through
545 add_agent_task. All special agent tasks are given a host on creation,
546 and a Null hqe. To create a SpecialAgentTask object, you need a
547 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
548 object contains a hqe it's passed on to the special agent task, which
549 creates a HostQueueEntry and saves it as it's queue_entry.
550
showardd1195652009-12-08 22:21:02 +0000551 @param special_task: a models.SpecialTask instance
552 @returns an AgentTask to run this SpecialTask
553 """
554 self._assert_host_has_no_agent(special_task)
555
beeps5e2bb4a2013-10-28 11:26:45 -0700556 special_agent_task_classes = (prejob_task.CleanupTask,
557 prejob_task.VerifyTask,
558 prejob_task.RepairTask,
559 prejob_task.ResetTask,
560 prejob_task.ProvisionTask)
561
showardd1195652009-12-08 22:21:02 +0000562 for agent_task_class in special_agent_task_classes:
563 if agent_task_class.TASK_TYPE == special_task.task:
564 return agent_task_class(task=special_task)
565
Prashanth B0e960282014-05-13 19:38:28 -0700566 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800567 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000568
569
570 def _register_pidfiles(self, agent_tasks):
571 for agent_task in agent_tasks:
572 agent_task.register_necessary_pidfiles()
573
574
575 def _recover_tasks(self, agent_tasks):
576 orphans = _drone_manager.get_orphaned_autoserv_processes()
577
578 for agent_task in agent_tasks:
579 agent_task.recover()
580 if agent_task.monitor and agent_task.monitor.has_process():
581 orphans.discard(agent_task.monitor.get_process())
582 self.add_agent_task(agent_task)
583
584 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000585
586
showard8cc058f2009-09-08 16:26:33 +0000587 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000588 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
589 % status):
showard0db3d432009-10-12 20:29:15 +0000590 if entry.status == status and not self.get_agents_for_entry(entry):
591 # The status can change during iteration, e.g., if job.run()
592 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000593 yield entry
594
595
showard6878e8b2009-07-20 22:37:45 +0000596 def _check_for_remaining_orphan_processes(self, orphans):
597 if not orphans:
598 return
599 subject = 'Unrecovered orphan autoserv processes remain'
600 message = '\n'.join(str(process) for process in orphans)
601 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000602
603 die_on_orphans = global_config.global_config.get_config_value(
604 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
605
606 if die_on_orphans:
607 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000608
showard170873e2009-01-07 00:22:26 +0000609
showard8cc058f2009-09-08 16:26:33 +0000610 def _recover_pending_entries(self):
611 for entry in self._get_unassigned_entries(
612 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000613 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000614 entry.on_pending()
615
616
showardb8900452009-10-12 20:31:01 +0000617 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000618 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000619 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
620 unrecovered_hqes = []
621 for queue_entry in queue_entries:
622 special_tasks = models.SpecialTask.objects.filter(
623 task__in=(models.SpecialTask.Task.CLEANUP,
624 models.SpecialTask.Task.VERIFY),
625 queue_entry__id=queue_entry.id,
626 is_complete=False)
627 if special_tasks.count() == 0:
628 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000629
showardb8900452009-10-12 20:31:01 +0000630 if unrecovered_hqes:
631 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700632 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000633 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000634 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000635
636
showard65db3932009-10-28 19:54:35 +0000637 def _schedule_special_tasks(self):
638 """
639 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700640
641 Special tasks include PreJobTasks like verify, reset and cleanup.
642 They are created through _schedule_new_jobs and associated with a hqe
643 This method translates SpecialTasks to the appropriate AgentTask and
644 adds them to the dispatchers agents list, so _handle_agents can execute
645 them.
showard65db3932009-10-28 19:54:35 +0000646 """
Prashanth B4ec98672014-05-15 10:44:54 -0700647 # When the host scheduler is responsible for acquisition we only want
648 # to run tasks with leased hosts. All hqe tasks will already have
649 # leased hosts, and we don't want to run frontend tasks till the host
650 # scheduler has vetted the assignment. Note that this doesn't include
651 # frontend tasks with hosts leased by other active hqes.
652 for task in self._job_query_manager.get_prioritized_special_tasks(
653 only_tasks_with_leased_hosts=not _inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000654 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000655 continue
showardd1195652009-12-08 22:21:02 +0000656 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000657
658
showard170873e2009-01-07 00:22:26 +0000659 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000660 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000661 # should never happen
showarded2afea2009-07-07 20:54:07 +0000662 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000663 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000664 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700665 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000666 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000667
668
jadmanski0afbb632008-06-06 21:10:57 +0000669 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000670 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700671 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000672 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000673 if self.host_has_agent(host):
674 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000675 continue
showard8cc058f2009-09-08 16:26:33 +0000676 if self._host_has_scheduled_special_task(host):
677 # host will have a special task scheduled on the next cycle
678 continue
showard170873e2009-01-07 00:22:26 +0000679 if print_message:
showardb18134f2009-03-20 20:52:18 +0000680 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000681 models.SpecialTask.objects.create(
682 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000683 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000684
685
jadmanski0afbb632008-06-06 21:10:57 +0000686 def _recover_hosts(self):
687 # recover "Repair Failed" hosts
688 message = 'Reverifying dead host %s'
689 self._reverify_hosts_where("status = 'Repair Failed'",
690 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000691
692
showard89f84db2009-03-12 20:39:13 +0000693 def _refresh_pending_queue_entries(self):
694 """
695 Lookup the pending HostQueueEntries and call our HostScheduler
696 refresh() method given that list. Return the list.
697
698 @returns A list of pending HostQueueEntries sorted in priority order.
699 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700700 queue_entries = self._job_query_manager.get_pending_queue_entries(
701 only_hostless=not _inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000702 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000703 return []
showard89f84db2009-03-12 20:39:13 +0000704 return queue_entries
705
706
showarda9545c02009-12-18 22:44:26 +0000707 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800708 """Schedule a hostless (suite) job.
709
710 @param queue_entry: The queue_entry representing the hostless job.
711 """
showarda9545c02009-12-18 22:44:26 +0000712 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000713 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000714
715
beepscc9fc702013-12-02 12:45:38 -0800716 def _schedule_host_job(self, host, queue_entry):
717 """Schedules a job on the given host.
718
719 1. Assign the host to the hqe, if it isn't already assigned.
720 2. Create a SpecialAgentTask for the hqe.
721 3. Activate the hqe.
722
723 @param queue_entry: The job to schedule.
724 @param host: The host to schedule the job on.
725 """
726 if self.host_has_agent(host):
727 host_agent_task = list(self._host_agents.get(host.id))[0].task
728 subject = 'Host with agents assigned to an HQE'
729 message = ('HQE: %s assigned host %s, but the host has '
730 'agent: %s for queue_entry %s. The HQE '
beepsf7d35162014-02-13 16:42:13 -0800731 'will have to try and acquire a host next tick ' %
beepscc9fc702013-12-02 12:45:38 -0800732 (queue_entry, host.hostname, host_agent_task,
733 host_agent_task.queue_entry))
734 email_manager.manager.enqueue_notify_email(subject, message)
beepscc9fc702013-12-02 12:45:38 -0800735 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700736 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800737
738
showard89f84db2009-03-12 20:39:13 +0000739 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700740 """
741 Find any new HQEs and call schedule_pre_job_tasks for it.
742
743 This involves setting the status of the HQE and creating a row in the
744 db corresponding the the special task, through
745 scheduler_models._queue_special_task. The new db row is then added as
746 an agent to the dispatcher through _schedule_special_tasks and
747 scheduled for execution on the drone through _handle_agents.
748 """
showard89f84db2009-03-12 20:39:13 +0000749 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000750
beepscc9fc702013-12-02 12:45:38 -0800751 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700752 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700753 new_jobs_with_hosts = 0
754 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800755 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700756 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000757
beepscc9fc702013-12-02 12:45:38 -0800758 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000759 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000760 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700761 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000762 else:
beepscc9fc702013-12-02 12:45:38 -0800763 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700764 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700765
beepsb255fc52013-10-13 23:28:54 -0700766 stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800767 if not host_jobs:
768 return
Prashanth Bf66d51b2014-05-06 12:42:25 -0700769 if not _inline_host_acquisition:
770 message = ('Found %s jobs that need hosts though '
771 '_inline_host_acquisition=%s. Will acquire hosts.' %
772 ([str(job) for job in host_jobs],
773 _inline_host_acquisition))
774 email_manager.manager.enqueue_notify_email(
775 'Processing unexpected host acquisition requests', message)
776 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
777 for host_assignment in jobs_with_hosts:
778 self._schedule_host_job(host_assignment.host, host_assignment.job)
779 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800780
beepsb255fc52013-10-13 23:28:54 -0700781 stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
782 stats.Gauge(key).send('new_jobs_without_hosts',
783 new_jobs_need_hosts - new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000784
785
showard8cc058f2009-09-08 16:26:33 +0000786 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700787 """
788 Adds agents to the dispatcher.
789
790 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
791 QueueTask for example, will have a job with a control file, and
792 the agent will have methods that poll, abort and check if the queue
793 task is finished. The dispatcher runs the agent_task, as well as
794 other agents in it's _agents member, through _handle_agents, by
795 calling the Agents tick().
796
797 This method creates an agent for each HQE in one of (starting, running,
798 gathering, parsing, archiving) states, and adds it to the dispatcher so
799 it is handled by _handle_agents.
800 """
showardd1195652009-12-08 22:21:02 +0000801 for agent_task in self._get_queue_entry_agent_tasks():
802 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000803
804
805 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000806 for entry in scheduler_models.HostQueueEntry.fetch(
807 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000808 task = entry.job.schedule_delayed_callback_task(entry)
809 if task:
showardd1195652009-12-08 22:21:02 +0000810 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000811
812
jadmanski0afbb632008-06-06 21:10:57 +0000813 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700814 """
815 Looks through the afe_host_queue_entries for an aborted entry.
816
817 The aborted bit is set on an HQE in many ways, the most common
818 being when a user requests an abort through the frontend, which
819 results in an rpc from the afe to abort_host_queue_entries.
820 """
jamesrene7c65cb2010-06-08 20:38:10 +0000821 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000822 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700823 where='aborted=1 and complete=0'):
showardf4a2e502009-07-28 20:06:39 +0000824 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800825
826 # The task would have started off with both is_complete and
827 # is_active = False. Aborted tasks are neither active nor complete.
828 # For all currently active tasks this will happen through the agent,
829 # but we need to manually update the special tasks that haven't
830 # started yet, because they don't have agents.
831 models.SpecialTask.objects.filter(is_active=False,
832 queue_entry_id=entry.id).update(is_complete=True)
833
showardd3dc1992009-04-22 21:01:40 +0000834 for agent in self.get_agents_for_entry(entry):
835 agent.abort()
836 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000837 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700838 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000839 for job in jobs_to_stop:
840 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000841
842
beeps8bb1f7d2013-08-05 01:30:09 -0700843 def _find_aborted_special_tasks(self):
844 """
845 Find SpecialTasks that have been marked for abortion.
846
847 Poll the database looking for SpecialTasks that are active
848 and have been marked for abortion, then abort them.
849 """
850
851 # The completed and active bits are very important when it comes
852 # to scheduler correctness. The active bit is set through the prolog
853 # of a special task, and reset through the cleanup method of the
854 # SpecialAgentTask. The cleanup is called both through the abort and
855 # epilog. The complete bit is set in several places, and in general
856 # a hanging job will have is_active=1 is_complete=0, while a special
857 # task which completed will have is_active=0 is_complete=1. To check
858 # aborts we directly check active because the complete bit is set in
859 # several places, including the epilog of agent tasks.
860 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
861 is_aborted=True)
862 for task in aborted_tasks:
863 # There are 2 ways to get the agent associated with a task,
864 # through the host and through the hqe. A special task
865 # always needs a host, but doesn't always need a hqe.
866 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700867 if isinstance(agent.task, agent_task.SpecialAgentTask):
beeps8bb1f7d2013-08-05 01:30:09 -0700868
869 # The epilog preforms critical actions such as
870 # queueing the next SpecialTask, requeuing the
871 # hqe etc, however it doesn't actually kill the
872 # monitor process and set the 'done' bit. Epilogs
873 # assume that the job failed, and that the monitor
874 # process has already written an exit code. The
875 # done bit is a necessary condition for
876 # _handle_agents to schedule any more special
877 # tasks against the host, and it must be set
878 # in addition to is_active, is_complete and success.
879 agent.task.epilog()
880 agent.task.abort()
881
882
showard324bf812009-01-20 23:23:38 +0000883 def _can_start_agent(self, agent, num_started_this_cycle,
884 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000885 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000886 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000887 return True
888 # don't allow any nonzero-process agents to run after we've reached a
889 # limit (this avoids starvation of many-process agents)
890 if have_reached_limit:
891 return False
892 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000893 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000894 agent.task.owner_username,
895 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000896 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000897 return False
898 # if a single agent exceeds the per-cycle throttling, still allow it to
899 # run when it's the first agent in the cycle
900 if num_started_this_cycle == 0:
901 return True
902 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000903 if (num_started_this_cycle + agent.task.num_processes >
904 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000905 return False
906 return True
907
908
jadmanski0afbb632008-06-06 21:10:57 +0000909 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700910 """
911 Handles agents of the dispatcher.
912
913 Appropriate Agents are added to the dispatcher through
914 _schedule_running_host_queue_entries. These agents each
915 have a task. This method runs the agents task through
916 agent.tick() leading to:
917 agent.start
918 prolog -> AgentTasks prolog
919 For each queue entry:
920 sets host status/status to Running
921 set started_on in afe_host_queue_entries
922 run -> AgentTasks run
923 Creates PidfileRunMonitor
924 Queues the autoserv command line for this AgentTask
925 via the drone manager. These commands are executed
926 through the drone managers execute actions.
927 poll -> AgentTasks/BaseAgentTask poll
928 checks the monitors exit_code.
929 Executes epilog if task is finished.
930 Executes AgentTasks _finish_task
931 finish_task is usually responsible for setting the status
932 of the HQE/host, and updating it's active and complete fileds.
933
934 agent.is_done
935 Removed the agent from the dispatchers _agents queue.
936 Is_done checks the finished bit on the agent, that is
937 set based on the Agents task. During the agents poll
938 we check to see if the monitor process has exited in
939 it's finish method, and set the success member of the
940 task based on this exit code.
941 """
jadmanski0afbb632008-06-06 21:10:57 +0000942 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000943 have_reached_limit = False
944 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700945 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000946 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700947 self._log_extra_msg('Processing Agent with Host Ids: %s and '
948 'queue_entry ids:%s' % (agent.host_ids,
949 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000950 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000951 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000952 have_reached_limit):
953 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700954 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000955 continue
showardd1195652009-12-08 22:21:02 +0000956 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700957 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000958 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700959 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000960 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700961 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000962 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700963 logging.info('%d running processes. %d added this cycle.',
964 _drone_manager.total_running_processes(),
965 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +0000966
967
showard29f7cd22009-04-29 21:16:24 +0000968 def _process_recurring_runs(self):
969 recurring_runs = models.RecurringRun.objects.filter(
970 start_date__lte=datetime.datetime.now())
971 for rrun in recurring_runs:
972 # Create job from template
973 job = rrun.job
974 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000975 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000976
977 host_objects = info['hosts']
978 one_time_hosts = info['one_time_hosts']
979 metahost_objects = info['meta_hosts']
980 dependencies = info['dependencies']
981 atomic_group = info['atomic_group']
982
983 for host in one_time_hosts or []:
984 this_host = models.Host.create_one_time_host(host.hostname)
985 host_objects.append(this_host)
986
987 try:
988 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000989 options=options,
showard29f7cd22009-04-29 21:16:24 +0000990 host_objects=host_objects,
991 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +0000992 atomic_group=atomic_group)
993
994 except Exception, ex:
995 logging.exception(ex)
996 #TODO send email
997
998 if rrun.loop_count == 1:
999 rrun.delete()
1000 else:
1001 if rrun.loop_count != 0: # if not infinite loop
1002 # calculate new start_date
1003 difference = datetime.timedelta(seconds=rrun.loop_period)
1004 rrun.start_date = rrun.start_date + difference
1005 rrun.loop_count -= 1
1006 rrun.save()
1007
1008
Simran Basia858a232012-08-21 11:04:37 -07001009SiteDispatcher = utils.import_site_class(
1010 __file__, 'autotest_lib.scheduler.site_monitor_db',
1011 'SiteDispatcher', BaseDispatcher)
1012
1013class Dispatcher(SiteDispatcher):
1014 pass
1015
1016
mbligh36768f02008-02-22 18:28:33 +00001017class Agent(object):
showard77182562009-06-10 00:16:05 +00001018 """
Alex Miller47715eb2013-07-24 03:34:01 -07001019 An agent for use by the Dispatcher class to perform a task. An agent wraps
1020 around an AgentTask mainly to associate the AgentTask with the queue_entry
1021 and host ids.
showard77182562009-06-10 00:16:05 +00001022
1023 The following methods are required on all task objects:
1024 poll() - Called periodically to let the task check its status and
1025 update its internal state. If the task succeeded.
1026 is_done() - Returns True if the task is finished.
1027 abort() - Called when an abort has been requested. The task must
1028 set its aborted attribute to True if it actually aborted.
1029
1030 The following attributes are required on all task objects:
1031 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001032 success - bool, True if this task succeeded.
1033 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1034 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001035 """
1036
1037
showard418785b2009-11-23 20:19:59 +00001038 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001039 """
Alex Miller47715eb2013-07-24 03:34:01 -07001040 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001041 """
showard8cc058f2009-09-08 16:26:33 +00001042 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001043
showard77182562009-06-10 00:16:05 +00001044 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001045 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001046
showard8cc058f2009-09-08 16:26:33 +00001047 self.queue_entry_ids = task.queue_entry_ids
1048 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001049
showard8cc058f2009-09-08 16:26:33 +00001050 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001051 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001052
1053
jadmanski0afbb632008-06-06 21:10:57 +00001054 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001055 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001056 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001057 self.task.poll()
1058 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001059 self.finished = True
showardec113162008-05-08 00:52:49 +00001060
1061
jadmanski0afbb632008-06-06 21:10:57 +00001062 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001063 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001064
1065
showardd3dc1992009-04-22 21:01:40 +00001066 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001067 if self.task:
1068 self.task.abort()
1069 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001070 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001071 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001072
showardd3dc1992009-04-22 21:01:40 +00001073
beeps5e2bb4a2013-10-28 11:26:45 -07001074class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001075 """
1076 Common functionality for QueueTask and HostlessQueueTask
1077 """
1078 def __init__(self, queue_entries):
1079 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001080 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001081 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001082
1083
showard73ec0442009-02-07 02:05:20 +00001084 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001085 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001086
1087
jamesrenc44ae992010-02-19 00:12:54 +00001088 def _write_control_file(self, execution_path):
1089 control_path = _drone_manager.attach_file_to_execution(
1090 execution_path, self.job.control_file)
1091 return control_path
1092
1093
Aviv Keshet308e7362013-05-21 14:43:16 -07001094 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001095 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001096 execution_path = self.queue_entries[0].execution_path()
1097 control_path = self._write_control_file(execution_path)
1098 hostnames = ','.join(entry.host.hostname
1099 for entry in self.queue_entries
1100 if not entry.is_hostless())
1101
1102 execution_tag = self.queue_entries[0].execution_tag()
1103 params = _autoserv_command_line(
1104 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001105 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001106 _drone_manager.absolute_path(control_path)],
1107 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001108 if self.job.is_image_update_job():
1109 params += ['--image', self.job.update_image_path]
1110
jamesrenc44ae992010-02-19 00:12:54 +00001111 return params
showardd1195652009-12-08 22:21:02 +00001112
1113
1114 @property
1115 def num_processes(self):
1116 return len(self.queue_entries)
1117
1118
1119 @property
1120 def owner_username(self):
1121 return self.job.owner
1122
1123
1124 def _working_directory(self):
1125 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001126
1127
jadmanski0afbb632008-06-06 21:10:57 +00001128 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001129 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001130 keyval_dict = self.job.keyval_dict()
1131 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001132 group_name = self.queue_entries[0].get_group_name()
1133 if group_name:
1134 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001135 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001136 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001137 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001138 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001139
1140
showard35162b02009-03-03 02:17:30 +00001141 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001142 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001143 _drone_manager.write_lines_to_file(error_file_path,
1144 [_LOST_PROCESS_ERROR])
1145
1146
showardd3dc1992009-04-22 21:01:40 +00001147 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001148 if not self.monitor:
1149 return
1150
showardd9205182009-04-27 20:09:55 +00001151 self._write_job_finished()
1152
showard35162b02009-03-03 02:17:30 +00001153 if self.monitor.lost_process:
1154 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001155
jadmanskif7fa2cc2008-10-01 14:13:23 +00001156
showardcbd74612008-11-19 21:42:02 +00001157 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001158 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001159 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001160 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001161 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001162
1163
jadmanskif7fa2cc2008-10-01 14:13:23 +00001164 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001165 if not self.monitor or not self.monitor.has_process():
1166 return
1167
jadmanskif7fa2cc2008-10-01 14:13:23 +00001168 # build up sets of all the aborted_by and aborted_on values
1169 aborted_by, aborted_on = set(), set()
1170 for queue_entry in self.queue_entries:
1171 if queue_entry.aborted_by:
1172 aborted_by.add(queue_entry.aborted_by)
1173 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1174 aborted_on.add(t)
1175
1176 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001177 # TODO(showard): this conditional is now obsolete, we just need to leave
1178 # it in temporarily for backwards compatibility over upgrades. delete
1179 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001180 assert len(aborted_by) <= 1
1181 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001182 aborted_by_value = aborted_by.pop()
1183 aborted_on_value = max(aborted_on)
1184 else:
1185 aborted_by_value = 'autotest_system'
1186 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001187
showarda0382352009-02-11 23:36:43 +00001188 self._write_keyval_after_job("aborted_by", aborted_by_value)
1189 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001190
showardcbd74612008-11-19 21:42:02 +00001191 aborted_on_string = str(datetime.datetime.fromtimestamp(
1192 aborted_on_value))
1193 self._write_status_comment('Job aborted by %s on %s' %
1194 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001195
1196
jadmanski0afbb632008-06-06 21:10:57 +00001197 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001198 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001199 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001200 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001201
1202
jadmanski0afbb632008-06-06 21:10:57 +00001203 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001204 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001205 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001206
1207
1208class QueueTask(AbstractQueueTask):
1209 def __init__(self, queue_entries):
1210 super(QueueTask, self).__init__(queue_entries)
1211 self._set_ids(queue_entries=queue_entries)
1212
1213
1214 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001215 self._check_queue_entry_statuses(
1216 self.queue_entries,
1217 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1218 models.HostQueueEntry.Status.RUNNING),
1219 allowed_host_statuses=(models.Host.Status.PENDING,
1220 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001221
1222 super(QueueTask, self).prolog()
1223
1224 for queue_entry in self.queue_entries:
1225 self._write_host_keyvals(queue_entry.host)
1226 queue_entry.host.set_status(models.Host.Status.RUNNING)
1227 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001228
1229
1230 def _finish_task(self):
1231 super(QueueTask, self)._finish_task()
1232
1233 for queue_entry in self.queue_entries:
1234 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001235 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001236
1237
Alex Miller9f01d5d2013-08-08 02:26:01 -07001238 def _command_line(self):
1239 invocation = super(QueueTask, self)._command_line()
1240 return invocation + ['--verify_job_repo_url']
1241
1242
Dan Shi1a189052013-10-28 14:41:35 -07001243class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001244 def __init__(self, queue_entry):
1245 super(HostlessQueueTask, self).__init__([queue_entry])
1246 self.queue_entry_ids = [queue_entry.id]
1247
1248
1249 def prolog(self):
1250 self.queue_entries[0].update_field('execution_subdir', 'hostless')
1251 super(HostlessQueueTask, self).prolog()
1252
1253
mbligh4608b002010-01-05 18:22:35 +00001254 def _finish_task(self):
1255 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001256
1257 # When a job is added to database, its initial status is always
1258 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1259 # status, check if any of them can be started. If scheduler hits some
1260 # limit, e.g., max_hostless_jobs_per_drone, max_jobs_started_per_cycle,
1261 # scheduler will leave these jobs in Starting status. Otherwise, the
1262 # jobs' status will be changed to Running, and an autoserv process will
1263 # be started in drone for each of these jobs.
1264 # If the entry is still in status Starting, the process has not started
1265 # yet. Therefore, there is no need to parse and collect log. Without
1266 # this check, exception will be raised by scheduler as execution_subdir
1267 # for this queue entry does not have a value yet.
1268 hqe = self.queue_entries[0]
1269 if hqe.status != models.HostQueueEntry.Status.STARTING:
1270 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001271
1272
mbligh36768f02008-02-22 18:28:33 +00001273if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001274 main()