blob: 4afd8c53229aa52006483844f318f4282ca02971 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
Aviv Keshet225bdfe2013-03-05 10:10:08 -08008import datetime, optparse, os, signal
beeps5e2bb4a2013-10-28 11:26:45 -07009import sys, time
Aviv Keshet225bdfe2013-03-05 10:10:08 -080010import logging, gc
showard402934a2009-12-21 22:20:47 +000011
Alex Miller05d7b4c2013-03-04 07:49:38 -080012import common
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000014
15import django.db
16
Prashanth B0e960282014-05-13 19:38:28 -070017from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070018from autotest_lib.client.common_lib import utils
Prashanth B0e960282014-05-13 19:38:28 -070019from autotest_lib.frontend.afe import models, rpc_utils
beeps5e2bb4a2013-10-28 11:26:45 -070020from autotest_lib.scheduler import agent_task, drone_manager, drones
21from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
22from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070023from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070024from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070025from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000026from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080027from autotest_lib.scheduler import status_server, scheduler_config
Prashanth Bf66d51b2014-05-06 12:42:25 -070028from autotest_lib.scheduler import scheduler_lib
Aviv Keshet308e7362013-05-21 14:43:16 -070029from autotest_lib.server import autoserv_utils
Fang Deng1d6c2a02013-04-17 15:25:45 -070030from autotest_lib.site_utils.graphite import stats
Alex Miller05d7b4c2013-03-04 07:49:38 -080031
showard549afad2009-08-20 23:33:36 +000032BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
33PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000034
mbligh36768f02008-02-22 18:28:33 +000035RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000036AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
37
38if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000039 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000040AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
41AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
42
43if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000044 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000045
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
Prashanth B0e960282014-05-13 19:38:28 -070053_db_manager = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070055
56# These 2 globals are replaced for testing
57_autoserv_directory = autoserv_utils.autoserv_directory
58_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000059_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000060_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070061_inline_host_acquisition = global_config.global_config.get_config_value(
62 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
63 default=True)
64
mbligh36768f02008-02-22 18:28:33 +000065
Eric Lie0493a42010-11-15 13:05:43 -080066def _parser_path_default(install_dir):
67 return os.path.join(install_dir, 'tko', 'parse')
68_parser_path_func = utils.import_site_function(
69 __file__, 'autotest_lib.scheduler.site_monitor_db',
70 'parser_path', _parser_path_default)
71_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
72
mbligh36768f02008-02-22 18:28:33 +000073
mbligh83c1e9e2009-05-01 23:10:41 +000074def _site_init_monitor_db_dummy():
75 return {}
76
77
jamesren76fcf192010-04-21 20:39:50 +000078def _verify_default_drone_set_exists():
79 if (models.DroneSet.drone_sets_enabled() and
80 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070081 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080082 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000083
84
85def _sanity_check():
86 """Make sure the configs are consistent before starting the scheduler"""
87 _verify_default_drone_set_exists()
88
89
mbligh36768f02008-02-22 18:28:33 +000090def main():
showard27f33872009-04-07 18:20:53 +000091 try:
showard549afad2009-08-20 23:33:36 +000092 try:
93 main_without_exception_handling()
94 except SystemExit:
95 raise
96 except:
97 logging.exception('Exception escaping in monitor_db')
98 raise
99 finally:
100 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000101
102
103def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700104 scheduler_lib.setup_logging(
105 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
106 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000107 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000108 parser = optparse.OptionParser(usage)
109 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
110 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000111 parser.add_option('--test', help='Indicate that scheduler is under ' +
112 'test and should use dummy autoserv and no parsing',
113 action='store_true')
114 (options, args) = parser.parse_args()
115 if len(args) != 1:
116 parser.print_usage()
117 return
mbligh36768f02008-02-22 18:28:33 +0000118
showard5613c662009-06-08 23:30:33 +0000119 scheduler_enabled = global_config.global_config.get_config_value(
120 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
121
122 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800123 logging.error("Scheduler not enabled, set enable_scheduler to true in "
124 "the global_config's SCHEDULER section to enable it. "
125 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000126 sys.exit(1)
127
jadmanski0afbb632008-06-06 21:10:57 +0000128 global RESULTS_DIR
129 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000130
mbligh83c1e9e2009-05-01 23:10:41 +0000131 site_init = utils.import_site_function(__file__,
132 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
133 _site_init_monitor_db_dummy)
134 site_init()
135
showardcca334f2009-03-12 20:38:34 +0000136 # Change the cwd while running to avoid issues incase we were launched from
137 # somewhere odd (such as a random NFS home directory of the person running
138 # sudo to launch us as the appropriate user).
139 os.chdir(RESULTS_DIR)
140
jamesrenc7d387e2010-08-10 21:48:30 +0000141 # This is helpful for debugging why stuff a scheduler launches is
142 # misbehaving.
143 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000144
jadmanski0afbb632008-06-06 21:10:57 +0000145 if options.test:
146 global _autoserv_path
147 _autoserv_path = 'autoserv_dummy'
148 global _testing_mode
149 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000150
jamesrenc44ae992010-02-19 00:12:54 +0000151 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000152 server.start()
153
jadmanski0afbb632008-06-06 21:10:57 +0000154 try:
jamesrenc44ae992010-02-19 00:12:54 +0000155 initialize()
showardc5afc462009-01-13 00:09:39 +0000156 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000157 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000158
Eric Lia82dc352011-02-23 13:15:52 -0800159 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000160 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000161 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000162 except:
showard170873e2009-01-07 00:22:26 +0000163 email_manager.manager.log_stacktrace(
164 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000165
showard170873e2009-01-07 00:22:26 +0000166 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000167 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000168 _drone_manager.shutdown()
Prashanth B0e960282014-05-13 19:38:28 -0700169 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000170
171
mbligh36768f02008-02-22 18:28:33 +0000172def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000173 global _shutdown
174 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000175 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000176
177
jamesrenc44ae992010-02-19 00:12:54 +0000178def initialize():
showardb18134f2009-03-20 20:52:18 +0000179 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
180 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000181
showard8de37132009-08-31 18:33:08 +0000182 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000183 logging.critical("monitor_db already running, aborting!")
184 sys.exit(1)
185 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000186
showardb1e51872008-10-07 11:08:18 +0000187 if _testing_mode:
188 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700189 scheduler_lib.DB_CONFIG_SECTION, 'database',
190 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000191
jadmanski0afbb632008-06-06 21:10:57 +0000192 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700193 global _db_manager
194 _db_manager = scheduler_lib.ConnectionManager()
showardb18134f2009-03-20 20:52:18 +0000195 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000196 signal.signal(signal.SIGINT, handle_sigint)
197
jamesrenc44ae992010-02-19 00:12:54 +0000198 initialize_globals()
199 scheduler_models.initialize()
200
showardd1ee1dd2009-01-07 21:33:08 +0000201 drones = global_config.global_config.get_config_value(
202 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
203 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000204 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000205 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000206 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
207
showardb18134f2009-03-20 20:52:18 +0000208 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000209
210
jamesrenc44ae992010-02-19 00:12:54 +0000211def initialize_globals():
212 global _drone_manager
213 _drone_manager = drone_manager.instance()
214
215
showarded2afea2009-07-07 20:54:07 +0000216def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
217 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000218 """
219 @returns The autoserv command line as a list of executable + parameters.
220
221 @param machines - string - A machine or comma separated list of machines
222 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000223 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700224 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
225 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000226 @param queue_entry - A HostQueueEntry object - If supplied and no Job
227 object was supplied, this will be used to lookup the Job object.
228 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700229 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
230 machines, results_directory=drone_manager.WORKING_DIRECTORY,
231 extra_args=extra_args, job=job, queue_entry=queue_entry,
232 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000233
234
Simran Basia858a232012-08-21 11:04:37 -0700235class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800236
237
jadmanski0afbb632008-06-06 21:10:57 +0000238 def __init__(self):
239 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000240 self._last_clean_time = time.time()
mblighf3294cc2009-04-08 21:17:38 +0000241 user_cleanup_time = scheduler_config.config.clean_interval
242 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Prashanth B0e960282014-05-13 19:38:28 -0700243 _db_manager.get_connection(), user_cleanup_time)
244 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
245 _db_manager.get_connection())
showard170873e2009-01-07 00:22:26 +0000246 self._host_agents = {}
247 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000248 self._tick_count = 0
249 self._last_garbage_stats_time = time.time()
250 self._seconds_between_garbage_stats = 60 * (
251 global_config.global_config.get_config_value(
252 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700253 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700254 self._tick_debug = global_config.global_config.get_config_value(
255 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
256 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700257 self._extra_debugging = global_config.global_config.get_config_value(
258 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
259 default=False)
mbligh36768f02008-02-22 18:28:33 +0000260
Prashanth Bf66d51b2014-05-06 12:42:25 -0700261 # If _inline_host_acquisition is set the scheduler will acquire and
262 # release hosts against jobs inline, with the tick. Otherwise the
263 # scheduler will only focus on jobs that already have hosts, and
264 # will not explicitly unlease a host when a job finishes using it.
265 self._job_query_manager = query_managers.AFEJobQueryManager()
266 self._host_scheduler = (host_scheduler.BaseHostScheduler()
267 if _inline_host_acquisition else
268 host_scheduler.DummyHostScheduler())
269
mbligh36768f02008-02-22 18:28:33 +0000270
showard915958d2009-04-22 21:00:58 +0000271 def initialize(self, recover_hosts=True):
272 self._periodic_cleanup.initialize()
273 self._24hr_upkeep.initialize()
274
jadmanski0afbb632008-06-06 21:10:57 +0000275 # always recover processes
276 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000277
jadmanski0afbb632008-06-06 21:10:57 +0000278 if recover_hosts:
279 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000280
281
Simran Basi0ec94dd2012-08-28 09:50:10 -0700282 def _log_tick_msg(self, msg):
283 if self._tick_debug:
284 logging.debug(msg)
285
286
Simran Basidef92872012-09-20 13:34:34 -0700287 def _log_extra_msg(self, msg):
288 if self._extra_debugging:
289 logging.debug(msg)
290
291
jadmanski0afbb632008-06-06 21:10:57 +0000292 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700293 """
294 This is an altered version of tick() where we keep track of when each
295 major step begins so we can try to figure out where we are using most
296 of the tick time.
297 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700298 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700299 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000300 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700301 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000302 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700303 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000304 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700305 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000306 self._find_aborting()
beeps8bb1f7d2013-08-05 01:30:09 -0700307 self._log_tick_msg('Calling _find_aborted_special_tasks().')
308 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700309 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000310 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700311 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000312 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700313 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000314 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700315 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000316 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700317 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000318 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700319 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000320 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700321 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000322 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700323 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000324 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700325 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700326 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700327 with timer.get_client('email_manager_send_queued_emails'):
328 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700329 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700330 with timer.get_client('django_db_reset_queries'):
331 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000332 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000333
showard97aed502008-11-04 02:01:24 +0000334
mblighf3294cc2009-04-08 21:17:38 +0000335 def _run_cleanup(self):
336 self._periodic_cleanup.run_cleanup_maybe()
337 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000338
mbligh36768f02008-02-22 18:28:33 +0000339
showardf13a9e22009-12-18 22:54:09 +0000340 def _garbage_collection(self):
341 threshold_time = time.time() - self._seconds_between_garbage_stats
342 if threshold_time < self._last_garbage_stats_time:
343 # Don't generate these reports very often.
344 return
345
346 self._last_garbage_stats_time = time.time()
347 # Force a full level 0 collection (because we can, it doesn't hurt
348 # at this interval).
349 gc.collect()
350 logging.info('Logging garbage collector stats on tick %d.',
351 self._tick_count)
352 gc_stats._log_garbage_collector_stats()
353
354
showard170873e2009-01-07 00:22:26 +0000355 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
356 for object_id in object_ids:
357 agent_dict.setdefault(object_id, set()).add(agent)
358
359
360 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
361 for object_id in object_ids:
362 assert object_id in agent_dict
363 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700364 # If an ID has no more active agent associated, there is no need to
365 # keep it in the dictionary. Otherwise, scheduler will keep an
366 # unnecessarily big dictionary until being restarted.
367 if not agent_dict[object_id]:
368 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000369
370
showardd1195652009-12-08 22:21:02 +0000371 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700372 """
373 Creates and adds an agent to the dispatchers list.
374
375 In creating the agent we also pass on all the queue_entry_ids and
376 host_ids from the special agent task. For every agent we create, we
377 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
378 against the host_ids given to it. So theoritically, a host can have any
379 number of agents associated with it, and each of them can have any
380 special agent task, though in practice we never see > 1 agent/task per
381 host at any time.
382
383 @param agent_task: A SpecialTask for the agent to manage.
384 """
showardd1195652009-12-08 22:21:02 +0000385 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000386 self._agents.append(agent)
387 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000388 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
389 self._register_agent_for_ids(self._queue_entry_agents,
390 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000391
showard170873e2009-01-07 00:22:26 +0000392
393 def get_agents_for_entry(self, queue_entry):
394 """
395 Find agents corresponding to the specified queue_entry.
396 """
showardd3dc1992009-04-22 21:01:40 +0000397 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000398
399
400 def host_has_agent(self, host):
401 """
402 Determine if there is currently an Agent present using this host.
403 """
404 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000405
406
jadmanski0afbb632008-06-06 21:10:57 +0000407 def remove_agent(self, agent):
408 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000409 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
410 agent)
411 self._unregister_agent_for_ids(self._queue_entry_agents,
412 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000413
414
showard8cc058f2009-09-08 16:26:33 +0000415 def _host_has_scheduled_special_task(self, host):
416 return bool(models.SpecialTask.objects.filter(host__id=host.id,
417 is_active=False,
418 is_complete=False))
419
420
jadmanski0afbb632008-06-06 21:10:57 +0000421 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000422 agent_tasks = self._create_recovery_agent_tasks()
423 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000424 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000425 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000426 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000427 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000428 self._reverify_remaining_hosts()
429 # reinitialize drones after killing orphaned processes, since they can
430 # leave around files when they die
431 _drone_manager.execute_actions()
432 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000433
showard170873e2009-01-07 00:22:26 +0000434
showardd1195652009-12-08 22:21:02 +0000435 def _create_recovery_agent_tasks(self):
436 return (self._get_queue_entry_agent_tasks()
437 + self._get_special_task_agent_tasks(is_active=True))
438
439
440 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700441 """
442 Get agent tasks for all hqe in the specified states.
443
444 Loosely this translates to taking a hqe in one of the specified states,
445 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
446 through _get_agent_task_for_queue_entry. Each queue entry can only have
447 one agent task at a time, but there might be multiple queue entries in
448 the group.
449
450 @return: A list of AgentTasks.
451 """
showardd1195652009-12-08 22:21:02 +0000452 # host queue entry statuses handled directly by AgentTasks (Verifying is
453 # handled through SpecialTasks, so is not listed here)
454 statuses = (models.HostQueueEntry.Status.STARTING,
455 models.HostQueueEntry.Status.RUNNING,
456 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000457 models.HostQueueEntry.Status.PARSING,
458 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000459 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000460 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000461 where='status IN (%s)' % status_list)
Alex Miller47cd2472013-11-25 15:20:04 -0800462 stats.Gauge('scheduler.jobs_per_tick').send(
463 'running', len(queue_entries))
showardd1195652009-12-08 22:21:02 +0000464
465 agent_tasks = []
466 used_queue_entries = set()
467 for entry in queue_entries:
468 if self.get_agents_for_entry(entry):
469 # already being handled
470 continue
471 if entry in used_queue_entries:
472 # already picked up by a synchronous job
473 continue
474 agent_task = self._get_agent_task_for_queue_entry(entry)
475 agent_tasks.append(agent_task)
476 used_queue_entries.update(agent_task.queue_entries)
477 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000478
479
showardd1195652009-12-08 22:21:02 +0000480 def _get_special_task_agent_tasks(self, is_active=False):
481 special_tasks = models.SpecialTask.objects.filter(
482 is_active=is_active, is_complete=False)
483 return [self._get_agent_task_for_special_task(task)
484 for task in special_tasks]
485
486
487 def _get_agent_task_for_queue_entry(self, queue_entry):
488 """
beeps8bb1f7d2013-08-05 01:30:09 -0700489 Construct an AgentTask instance for the given active HostQueueEntry.
490
showardd1195652009-12-08 22:21:02 +0000491 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700492 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000493 """
494 task_entries = queue_entry.job.get_group_entries(queue_entry)
495 self._check_for_duplicate_host_entries(task_entries)
496
497 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
498 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000499 if queue_entry.is_hostless():
500 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000501 return QueueTask(queue_entries=task_entries)
502 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700503 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000504 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700505 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000506 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700507 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000508
Prashanth B0e960282014-05-13 19:38:28 -0700509 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800510 '_get_agent_task_for_queue_entry got entry with '
511 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000512
513
514 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000515 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
516 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000517 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000518 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000519 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000520 if using_host:
showardd1195652009-12-08 22:21:02 +0000521 self._assert_host_has_no_agent(task_entry)
522
523
524 def _assert_host_has_no_agent(self, entry):
525 """
526 @param entry: a HostQueueEntry or a SpecialTask
527 """
528 if self.host_has_agent(entry.host):
529 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700530 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000531 'While scheduling %s, host %s already has a host agent %s'
532 % (entry, entry.host, agent.task))
533
534
535 def _get_agent_task_for_special_task(self, special_task):
536 """
537 Construct an AgentTask class to run the given SpecialTask and add it
538 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700539
540 A special task is create through schedule_special_tasks, but only if
541 the host doesn't already have an agent. This happens through
542 add_agent_task. All special agent tasks are given a host on creation,
543 and a Null hqe. To create a SpecialAgentTask object, you need a
544 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
545 object contains a hqe it's passed on to the special agent task, which
546 creates a HostQueueEntry and saves it as it's queue_entry.
547
showardd1195652009-12-08 22:21:02 +0000548 @param special_task: a models.SpecialTask instance
549 @returns an AgentTask to run this SpecialTask
550 """
551 self._assert_host_has_no_agent(special_task)
552
beeps5e2bb4a2013-10-28 11:26:45 -0700553 special_agent_task_classes = (prejob_task.CleanupTask,
554 prejob_task.VerifyTask,
555 prejob_task.RepairTask,
556 prejob_task.ResetTask,
557 prejob_task.ProvisionTask)
558
showardd1195652009-12-08 22:21:02 +0000559 for agent_task_class in special_agent_task_classes:
560 if agent_task_class.TASK_TYPE == special_task.task:
561 return agent_task_class(task=special_task)
562
Prashanth B0e960282014-05-13 19:38:28 -0700563 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800564 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000565
566
567 def _register_pidfiles(self, agent_tasks):
568 for agent_task in agent_tasks:
569 agent_task.register_necessary_pidfiles()
570
571
572 def _recover_tasks(self, agent_tasks):
573 orphans = _drone_manager.get_orphaned_autoserv_processes()
574
575 for agent_task in agent_tasks:
576 agent_task.recover()
577 if agent_task.monitor and agent_task.monitor.has_process():
578 orphans.discard(agent_task.monitor.get_process())
579 self.add_agent_task(agent_task)
580
581 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000582
583
showard8cc058f2009-09-08 16:26:33 +0000584 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000585 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
586 % status):
showard0db3d432009-10-12 20:29:15 +0000587 if entry.status == status and not self.get_agents_for_entry(entry):
588 # The status can change during iteration, e.g., if job.run()
589 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000590 yield entry
591
592
showard6878e8b2009-07-20 22:37:45 +0000593 def _check_for_remaining_orphan_processes(self, orphans):
594 if not orphans:
595 return
596 subject = 'Unrecovered orphan autoserv processes remain'
597 message = '\n'.join(str(process) for process in orphans)
598 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000599
600 die_on_orphans = global_config.global_config.get_config_value(
601 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
602
603 if die_on_orphans:
604 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000605
showard170873e2009-01-07 00:22:26 +0000606
showard8cc058f2009-09-08 16:26:33 +0000607 def _recover_pending_entries(self):
608 for entry in self._get_unassigned_entries(
609 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000610 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000611 entry.on_pending()
612
613
showardb8900452009-10-12 20:31:01 +0000614 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000615 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000616 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
617 unrecovered_hqes = []
618 for queue_entry in queue_entries:
619 special_tasks = models.SpecialTask.objects.filter(
620 task__in=(models.SpecialTask.Task.CLEANUP,
621 models.SpecialTask.Task.VERIFY),
622 queue_entry__id=queue_entry.id,
623 is_complete=False)
624 if special_tasks.count() == 0:
625 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000626
showardb8900452009-10-12 20:31:01 +0000627 if unrecovered_hqes:
628 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700629 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000630 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000631 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000632
633
showard65db3932009-10-28 19:54:35 +0000634 def _schedule_special_tasks(self):
635 """
636 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700637
638 Special tasks include PreJobTasks like verify, reset and cleanup.
639 They are created through _schedule_new_jobs and associated with a hqe
640 This method translates SpecialTasks to the appropriate AgentTask and
641 adds them to the dispatchers agents list, so _handle_agents can execute
642 them.
showard65db3932009-10-28 19:54:35 +0000643 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700644 for task in self._job_query_manager.get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000645 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000646 continue
showardd1195652009-12-08 22:21:02 +0000647 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000648
649
showard170873e2009-01-07 00:22:26 +0000650 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000651 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000652 # should never happen
showarded2afea2009-07-07 20:54:07 +0000653 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000654 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000655 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700656 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000657 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000658
659
jadmanski0afbb632008-06-06 21:10:57 +0000660 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000661 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700662 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000663 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000664 if self.host_has_agent(host):
665 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000666 continue
showard8cc058f2009-09-08 16:26:33 +0000667 if self._host_has_scheduled_special_task(host):
668 # host will have a special task scheduled on the next cycle
669 continue
showard170873e2009-01-07 00:22:26 +0000670 if print_message:
showardb18134f2009-03-20 20:52:18 +0000671 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000672 models.SpecialTask.objects.create(
673 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000674 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000675
676
jadmanski0afbb632008-06-06 21:10:57 +0000677 def _recover_hosts(self):
678 # recover "Repair Failed" hosts
679 message = 'Reverifying dead host %s'
680 self._reverify_hosts_where("status = 'Repair Failed'",
681 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000682
683
showard89f84db2009-03-12 20:39:13 +0000684 def _refresh_pending_queue_entries(self):
685 """
686 Lookup the pending HostQueueEntries and call our HostScheduler
687 refresh() method given that list. Return the list.
688
689 @returns A list of pending HostQueueEntries sorted in priority order.
690 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700691 queue_entries = self._job_query_manager.get_pending_queue_entries(
692 only_hostless=not _inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000693 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000694 return []
showard89f84db2009-03-12 20:39:13 +0000695 return queue_entries
696
697
showarda9545c02009-12-18 22:44:26 +0000698 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800699 """Schedule a hostless (suite) job.
700
701 @param queue_entry: The queue_entry representing the hostless job.
702 """
showarda9545c02009-12-18 22:44:26 +0000703 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000704 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000705
706
beepscc9fc702013-12-02 12:45:38 -0800707 def _schedule_host_job(self, host, queue_entry):
708 """Schedules a job on the given host.
709
710 1. Assign the host to the hqe, if it isn't already assigned.
711 2. Create a SpecialAgentTask for the hqe.
712 3. Activate the hqe.
713
714 @param queue_entry: The job to schedule.
715 @param host: The host to schedule the job on.
716 """
717 if self.host_has_agent(host):
718 host_agent_task = list(self._host_agents.get(host.id))[0].task
719 subject = 'Host with agents assigned to an HQE'
720 message = ('HQE: %s assigned host %s, but the host has '
721 'agent: %s for queue_entry %s. The HQE '
beepsf7d35162014-02-13 16:42:13 -0800722 'will have to try and acquire a host next tick ' %
beepscc9fc702013-12-02 12:45:38 -0800723 (queue_entry, host.hostname, host_agent_task,
724 host_agent_task.queue_entry))
725 email_manager.manager.enqueue_notify_email(subject, message)
beepscc9fc702013-12-02 12:45:38 -0800726 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700727 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800728
729
showard89f84db2009-03-12 20:39:13 +0000730 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700731 """
732 Find any new HQEs and call schedule_pre_job_tasks for it.
733
734 This involves setting the status of the HQE and creating a row in the
735 db corresponding the the special task, through
736 scheduler_models._queue_special_task. The new db row is then added as
737 an agent to the dispatcher through _schedule_special_tasks and
738 scheduled for execution on the drone through _handle_agents.
739 """
showard89f84db2009-03-12 20:39:13 +0000740 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000741
beepscc9fc702013-12-02 12:45:38 -0800742 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700743 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700744 new_jobs_with_hosts = 0
745 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800746 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700747 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000748
beepscc9fc702013-12-02 12:45:38 -0800749 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000750 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000751 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700752 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000753 else:
beepscc9fc702013-12-02 12:45:38 -0800754 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700755 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700756
beepsb255fc52013-10-13 23:28:54 -0700757 stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800758 if not host_jobs:
759 return
Prashanth Bf66d51b2014-05-06 12:42:25 -0700760 if not _inline_host_acquisition:
761 message = ('Found %s jobs that need hosts though '
762 '_inline_host_acquisition=%s. Will acquire hosts.' %
763 ([str(job) for job in host_jobs],
764 _inline_host_acquisition))
765 email_manager.manager.enqueue_notify_email(
766 'Processing unexpected host acquisition requests', message)
767 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
768 for host_assignment in jobs_with_hosts:
769 self._schedule_host_job(host_assignment.host, host_assignment.job)
770 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800771
beepsb255fc52013-10-13 23:28:54 -0700772 stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
773 stats.Gauge(key).send('new_jobs_without_hosts',
774 new_jobs_need_hosts - new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000775
776
showard8cc058f2009-09-08 16:26:33 +0000777 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700778 """
779 Adds agents to the dispatcher.
780
781 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
782 QueueTask for example, will have a job with a control file, and
783 the agent will have methods that poll, abort and check if the queue
784 task is finished. The dispatcher runs the agent_task, as well as
785 other agents in it's _agents member, through _handle_agents, by
786 calling the Agents tick().
787
788 This method creates an agent for each HQE in one of (starting, running,
789 gathering, parsing, archiving) states, and adds it to the dispatcher so
790 it is handled by _handle_agents.
791 """
showardd1195652009-12-08 22:21:02 +0000792 for agent_task in self._get_queue_entry_agent_tasks():
793 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000794
795
796 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000797 for entry in scheduler_models.HostQueueEntry.fetch(
798 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000799 task = entry.job.schedule_delayed_callback_task(entry)
800 if task:
showardd1195652009-12-08 22:21:02 +0000801 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000802
803
jadmanski0afbb632008-06-06 21:10:57 +0000804 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700805 """
806 Looks through the afe_host_queue_entries for an aborted entry.
807
808 The aborted bit is set on an HQE in many ways, the most common
809 being when a user requests an abort through the frontend, which
810 results in an rpc from the afe to abort_host_queue_entries.
811 """
jamesrene7c65cb2010-06-08 20:38:10 +0000812 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000813 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700814 where='aborted=1 and complete=0'):
showardf4a2e502009-07-28 20:06:39 +0000815 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800816
817 # The task would have started off with both is_complete and
818 # is_active = False. Aborted tasks are neither active nor complete.
819 # For all currently active tasks this will happen through the agent,
820 # but we need to manually update the special tasks that haven't
821 # started yet, because they don't have agents.
822 models.SpecialTask.objects.filter(is_active=False,
823 queue_entry_id=entry.id).update(is_complete=True)
824
showardd3dc1992009-04-22 21:01:40 +0000825 for agent in self.get_agents_for_entry(entry):
826 agent.abort()
827 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000828 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700829 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000830 for job in jobs_to_stop:
831 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000832
833
beeps8bb1f7d2013-08-05 01:30:09 -0700834 def _find_aborted_special_tasks(self):
835 """
836 Find SpecialTasks that have been marked for abortion.
837
838 Poll the database looking for SpecialTasks that are active
839 and have been marked for abortion, then abort them.
840 """
841
842 # The completed and active bits are very important when it comes
843 # to scheduler correctness. The active bit is set through the prolog
844 # of a special task, and reset through the cleanup method of the
845 # SpecialAgentTask. The cleanup is called both through the abort and
846 # epilog. The complete bit is set in several places, and in general
847 # a hanging job will have is_active=1 is_complete=0, while a special
848 # task which completed will have is_active=0 is_complete=1. To check
849 # aborts we directly check active because the complete bit is set in
850 # several places, including the epilog of agent tasks.
851 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
852 is_aborted=True)
853 for task in aborted_tasks:
854 # There are 2 ways to get the agent associated with a task,
855 # through the host and through the hqe. A special task
856 # always needs a host, but doesn't always need a hqe.
857 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700858 if isinstance(agent.task, agent_task.SpecialAgentTask):
beeps8bb1f7d2013-08-05 01:30:09 -0700859
860 # The epilog preforms critical actions such as
861 # queueing the next SpecialTask, requeuing the
862 # hqe etc, however it doesn't actually kill the
863 # monitor process and set the 'done' bit. Epilogs
864 # assume that the job failed, and that the monitor
865 # process has already written an exit code. The
866 # done bit is a necessary condition for
867 # _handle_agents to schedule any more special
868 # tasks against the host, and it must be set
869 # in addition to is_active, is_complete and success.
870 agent.task.epilog()
871 agent.task.abort()
872
873
showard324bf812009-01-20 23:23:38 +0000874 def _can_start_agent(self, agent, num_started_this_cycle,
875 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000876 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000877 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000878 return True
879 # don't allow any nonzero-process agents to run after we've reached a
880 # limit (this avoids starvation of many-process agents)
881 if have_reached_limit:
882 return False
883 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000884 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000885 agent.task.owner_username,
886 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000887 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000888 return False
889 # if a single agent exceeds the per-cycle throttling, still allow it to
890 # run when it's the first agent in the cycle
891 if num_started_this_cycle == 0:
892 return True
893 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000894 if (num_started_this_cycle + agent.task.num_processes >
895 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000896 return False
897 return True
898
899
jadmanski0afbb632008-06-06 21:10:57 +0000900 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700901 """
902 Handles agents of the dispatcher.
903
904 Appropriate Agents are added to the dispatcher through
905 _schedule_running_host_queue_entries. These agents each
906 have a task. This method runs the agents task through
907 agent.tick() leading to:
908 agent.start
909 prolog -> AgentTasks prolog
910 For each queue entry:
911 sets host status/status to Running
912 set started_on in afe_host_queue_entries
913 run -> AgentTasks run
914 Creates PidfileRunMonitor
915 Queues the autoserv command line for this AgentTask
916 via the drone manager. These commands are executed
917 through the drone managers execute actions.
918 poll -> AgentTasks/BaseAgentTask poll
919 checks the monitors exit_code.
920 Executes epilog if task is finished.
921 Executes AgentTasks _finish_task
922 finish_task is usually responsible for setting the status
923 of the HQE/host, and updating it's active and complete fileds.
924
925 agent.is_done
926 Removed the agent from the dispatchers _agents queue.
927 Is_done checks the finished bit on the agent, that is
928 set based on the Agents task. During the agents poll
929 we check to see if the monitor process has exited in
930 it's finish method, and set the success member of the
931 task based on this exit code.
932 """
jadmanski0afbb632008-06-06 21:10:57 +0000933 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000934 have_reached_limit = False
935 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700936 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000937 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700938 self._log_extra_msg('Processing Agent with Host Ids: %s and '
939 'queue_entry ids:%s' % (agent.host_ids,
940 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000941 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000942 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000943 have_reached_limit):
944 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700945 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000946 continue
showardd1195652009-12-08 22:21:02 +0000947 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700948 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000949 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700950 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000951 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700952 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000953 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700954 logging.info('%d running processes. %d added this cycle.',
955 _drone_manager.total_running_processes(),
956 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +0000957
958
showard29f7cd22009-04-29 21:16:24 +0000959 def _process_recurring_runs(self):
960 recurring_runs = models.RecurringRun.objects.filter(
961 start_date__lte=datetime.datetime.now())
962 for rrun in recurring_runs:
963 # Create job from template
964 job = rrun.job
965 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000966 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000967
968 host_objects = info['hosts']
969 one_time_hosts = info['one_time_hosts']
970 metahost_objects = info['meta_hosts']
971 dependencies = info['dependencies']
972 atomic_group = info['atomic_group']
973
974 for host in one_time_hosts or []:
975 this_host = models.Host.create_one_time_host(host.hostname)
976 host_objects.append(this_host)
977
978 try:
979 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000980 options=options,
showard29f7cd22009-04-29 21:16:24 +0000981 host_objects=host_objects,
982 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +0000983 atomic_group=atomic_group)
984
985 except Exception, ex:
986 logging.exception(ex)
987 #TODO send email
988
989 if rrun.loop_count == 1:
990 rrun.delete()
991 else:
992 if rrun.loop_count != 0: # if not infinite loop
993 # calculate new start_date
994 difference = datetime.timedelta(seconds=rrun.loop_period)
995 rrun.start_date = rrun.start_date + difference
996 rrun.loop_count -= 1
997 rrun.save()
998
999
Simran Basia858a232012-08-21 11:04:37 -07001000SiteDispatcher = utils.import_site_class(
1001 __file__, 'autotest_lib.scheduler.site_monitor_db',
1002 'SiteDispatcher', BaseDispatcher)
1003
1004class Dispatcher(SiteDispatcher):
1005 pass
1006
1007
mbligh36768f02008-02-22 18:28:33 +00001008class Agent(object):
showard77182562009-06-10 00:16:05 +00001009 """
Alex Miller47715eb2013-07-24 03:34:01 -07001010 An agent for use by the Dispatcher class to perform a task. An agent wraps
1011 around an AgentTask mainly to associate the AgentTask with the queue_entry
1012 and host ids.
showard77182562009-06-10 00:16:05 +00001013
1014 The following methods are required on all task objects:
1015 poll() - Called periodically to let the task check its status and
1016 update its internal state. If the task succeeded.
1017 is_done() - Returns True if the task is finished.
1018 abort() - Called when an abort has been requested. The task must
1019 set its aborted attribute to True if it actually aborted.
1020
1021 The following attributes are required on all task objects:
1022 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001023 success - bool, True if this task succeeded.
1024 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1025 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001026 """
1027
1028
showard418785b2009-11-23 20:19:59 +00001029 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001030 """
Alex Miller47715eb2013-07-24 03:34:01 -07001031 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001032 """
showard8cc058f2009-09-08 16:26:33 +00001033 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001034
showard77182562009-06-10 00:16:05 +00001035 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001036 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001037
showard8cc058f2009-09-08 16:26:33 +00001038 self.queue_entry_ids = task.queue_entry_ids
1039 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001040
showard8cc058f2009-09-08 16:26:33 +00001041 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001042 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001043
1044
jadmanski0afbb632008-06-06 21:10:57 +00001045 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001046 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001047 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001048 self.task.poll()
1049 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001050 self.finished = True
showardec113162008-05-08 00:52:49 +00001051
1052
jadmanski0afbb632008-06-06 21:10:57 +00001053 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001054 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001055
1056
showardd3dc1992009-04-22 21:01:40 +00001057 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001058 if self.task:
1059 self.task.abort()
1060 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001061 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001062 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001063
showardd3dc1992009-04-22 21:01:40 +00001064
beeps5e2bb4a2013-10-28 11:26:45 -07001065class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001066 """
1067 Common functionality for QueueTask and HostlessQueueTask
1068 """
1069 def __init__(self, queue_entries):
1070 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001071 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001072 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001073
1074
showard73ec0442009-02-07 02:05:20 +00001075 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001076 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001077
1078
jamesrenc44ae992010-02-19 00:12:54 +00001079 def _write_control_file(self, execution_path):
1080 control_path = _drone_manager.attach_file_to_execution(
1081 execution_path, self.job.control_file)
1082 return control_path
1083
1084
Aviv Keshet308e7362013-05-21 14:43:16 -07001085 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001086 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001087 execution_path = self.queue_entries[0].execution_path()
1088 control_path = self._write_control_file(execution_path)
1089 hostnames = ','.join(entry.host.hostname
1090 for entry in self.queue_entries
1091 if not entry.is_hostless())
1092
1093 execution_tag = self.queue_entries[0].execution_tag()
1094 params = _autoserv_command_line(
1095 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001096 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001097 _drone_manager.absolute_path(control_path)],
1098 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001099 if self.job.is_image_update_job():
1100 params += ['--image', self.job.update_image_path]
1101
jamesrenc44ae992010-02-19 00:12:54 +00001102 return params
showardd1195652009-12-08 22:21:02 +00001103
1104
1105 @property
1106 def num_processes(self):
1107 return len(self.queue_entries)
1108
1109
1110 @property
1111 def owner_username(self):
1112 return self.job.owner
1113
1114
1115 def _working_directory(self):
1116 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001117
1118
jadmanski0afbb632008-06-06 21:10:57 +00001119 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001120 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001121 keyval_dict = self.job.keyval_dict()
1122 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001123 group_name = self.queue_entries[0].get_group_name()
1124 if group_name:
1125 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001126 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001127 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001128 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001129 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001130
1131
showard35162b02009-03-03 02:17:30 +00001132 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001133 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001134 _drone_manager.write_lines_to_file(error_file_path,
1135 [_LOST_PROCESS_ERROR])
1136
1137
showardd3dc1992009-04-22 21:01:40 +00001138 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001139 if not self.monitor:
1140 return
1141
showardd9205182009-04-27 20:09:55 +00001142 self._write_job_finished()
1143
showard35162b02009-03-03 02:17:30 +00001144 if self.monitor.lost_process:
1145 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001146
jadmanskif7fa2cc2008-10-01 14:13:23 +00001147
showardcbd74612008-11-19 21:42:02 +00001148 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001149 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001150 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001151 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001152 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001153
1154
jadmanskif7fa2cc2008-10-01 14:13:23 +00001155 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001156 if not self.monitor or not self.monitor.has_process():
1157 return
1158
jadmanskif7fa2cc2008-10-01 14:13:23 +00001159 # build up sets of all the aborted_by and aborted_on values
1160 aborted_by, aborted_on = set(), set()
1161 for queue_entry in self.queue_entries:
1162 if queue_entry.aborted_by:
1163 aborted_by.add(queue_entry.aborted_by)
1164 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1165 aborted_on.add(t)
1166
1167 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001168 # TODO(showard): this conditional is now obsolete, we just need to leave
1169 # it in temporarily for backwards compatibility over upgrades. delete
1170 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001171 assert len(aborted_by) <= 1
1172 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001173 aborted_by_value = aborted_by.pop()
1174 aborted_on_value = max(aborted_on)
1175 else:
1176 aborted_by_value = 'autotest_system'
1177 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001178
showarda0382352009-02-11 23:36:43 +00001179 self._write_keyval_after_job("aborted_by", aborted_by_value)
1180 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001181
showardcbd74612008-11-19 21:42:02 +00001182 aborted_on_string = str(datetime.datetime.fromtimestamp(
1183 aborted_on_value))
1184 self._write_status_comment('Job aborted by %s on %s' %
1185 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001186
1187
jadmanski0afbb632008-06-06 21:10:57 +00001188 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001189 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001190 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001191 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001192
1193
jadmanski0afbb632008-06-06 21:10:57 +00001194 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001195 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001196 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001197
1198
1199class QueueTask(AbstractQueueTask):
1200 def __init__(self, queue_entries):
1201 super(QueueTask, self).__init__(queue_entries)
1202 self._set_ids(queue_entries=queue_entries)
1203
1204
1205 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001206 self._check_queue_entry_statuses(
1207 self.queue_entries,
1208 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1209 models.HostQueueEntry.Status.RUNNING),
1210 allowed_host_statuses=(models.Host.Status.PENDING,
1211 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001212
1213 super(QueueTask, self).prolog()
1214
1215 for queue_entry in self.queue_entries:
1216 self._write_host_keyvals(queue_entry.host)
1217 queue_entry.host.set_status(models.Host.Status.RUNNING)
1218 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001219
1220
1221 def _finish_task(self):
1222 super(QueueTask, self)._finish_task()
1223
1224 for queue_entry in self.queue_entries:
1225 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001226 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001227
1228
Alex Miller9f01d5d2013-08-08 02:26:01 -07001229 def _command_line(self):
1230 invocation = super(QueueTask, self)._command_line()
1231 return invocation + ['--verify_job_repo_url']
1232
1233
Dan Shi1a189052013-10-28 14:41:35 -07001234class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001235 def __init__(self, queue_entry):
1236 super(HostlessQueueTask, self).__init__([queue_entry])
1237 self.queue_entry_ids = [queue_entry.id]
1238
1239
1240 def prolog(self):
1241 self.queue_entries[0].update_field('execution_subdir', 'hostless')
1242 super(HostlessQueueTask, self).prolog()
1243
1244
mbligh4608b002010-01-05 18:22:35 +00001245 def _finish_task(self):
1246 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001247
1248 # When a job is added to database, its initial status is always
1249 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1250 # status, check if any of them can be started. If scheduler hits some
1251 # limit, e.g., max_hostless_jobs_per_drone, max_jobs_started_per_cycle,
1252 # scheduler will leave these jobs in Starting status. Otherwise, the
1253 # jobs' status will be changed to Running, and an autoserv process will
1254 # be started in drone for each of these jobs.
1255 # If the entry is still in status Starting, the process has not started
1256 # yet. Therefore, there is no need to parse and collect log. Without
1257 # this check, exception will be raised by scheduler as execution_subdir
1258 # for this queue entry does not have a value yet.
1259 hqe = self.queue_entries[0]
1260 if hqe.status != models.HostQueueEntry.Status.STARTING:
1261 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001262
1263
mbligh36768f02008-02-22 18:28:33 +00001264if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001265 main()