blob: e785c04bfe16599b0e64f59ea349ebb23b971cf3 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
Aviv Keshet225bdfe2013-03-05 10:10:08 -08008import datetime, optparse, os, signal
beeps5e2bb4a2013-10-28 11:26:45 -07009import sys, time
Aviv Keshet225bdfe2013-03-05 10:10:08 -080010import logging, gc
showard402934a2009-12-21 22:20:47 +000011
Alex Miller05d7b4c2013-03-04 07:49:38 -080012import common
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000014
15import django.db
16
Prashanth B0e960282014-05-13 19:38:28 -070017from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070018from autotest_lib.client.common_lib import utils
Michael Liangda8c60a2014-06-03 13:24:51 -070019from autotest_lib.client.common_lib.cros.graphite import stats
Prashanth B0e960282014-05-13 19:38:28 -070020from autotest_lib.frontend.afe import models, rpc_utils
beeps5e2bb4a2013-10-28 11:26:45 -070021from autotest_lib.scheduler import agent_task, drone_manager, drones
22from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
23from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070024from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070025from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070026from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000027from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080028from autotest_lib.scheduler import status_server, scheduler_config
Prashanth Bf66d51b2014-05-06 12:42:25 -070029from autotest_lib.scheduler import scheduler_lib
Aviv Keshet308e7362013-05-21 14:43:16 -070030from autotest_lib.server import autoserv_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080031
showard549afad2009-08-20 23:33:36 +000032BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
33PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000034
mbligh36768f02008-02-22 18:28:33 +000035RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000036AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
37
38if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000039 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000040AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
41AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
42
43if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000044 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000045
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
Prashanth B0e960282014-05-13 19:38:28 -070053_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070056
57# These 2 globals are replaced for testing
58_autoserv_directory = autoserv_utils.autoserv_directory
59_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000060_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000061_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070062_inline_host_acquisition = global_config.global_config.get_config_value(
63 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
64 default=True)
65
mbligh36768f02008-02-22 18:28:33 +000066
Eric Lie0493a42010-11-15 13:05:43 -080067def _parser_path_default(install_dir):
68 return os.path.join(install_dir, 'tko', 'parse')
69_parser_path_func = utils.import_site_function(
70 __file__, 'autotest_lib.scheduler.site_monitor_db',
71 'parser_path', _parser_path_default)
72_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
73
mbligh36768f02008-02-22 18:28:33 +000074
mbligh83c1e9e2009-05-01 23:10:41 +000075def _site_init_monitor_db_dummy():
76 return {}
77
78
jamesren76fcf192010-04-21 20:39:50 +000079def _verify_default_drone_set_exists():
80 if (models.DroneSet.drone_sets_enabled() and
81 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070082 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080083 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000084
85
86def _sanity_check():
87 """Make sure the configs are consistent before starting the scheduler"""
88 _verify_default_drone_set_exists()
89
90
mbligh36768f02008-02-22 18:28:33 +000091def main():
showard27f33872009-04-07 18:20:53 +000092 try:
showard549afad2009-08-20 23:33:36 +000093 try:
94 main_without_exception_handling()
95 except SystemExit:
96 raise
97 except:
98 logging.exception('Exception escaping in monitor_db')
99 raise
100 finally:
101 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000102
103
104def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700105 scheduler_lib.setup_logging(
106 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
107 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000108 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000109 parser = optparse.OptionParser(usage)
110 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
111 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000112 parser.add_option('--test', help='Indicate that scheduler is under ' +
113 'test and should use dummy autoserv and no parsing',
114 action='store_true')
115 (options, args) = parser.parse_args()
116 if len(args) != 1:
117 parser.print_usage()
118 return
mbligh36768f02008-02-22 18:28:33 +0000119
showard5613c662009-06-08 23:30:33 +0000120 scheduler_enabled = global_config.global_config.get_config_value(
121 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
122
123 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800124 logging.error("Scheduler not enabled, set enable_scheduler to true in "
125 "the global_config's SCHEDULER section to enable it. "
126 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000127 sys.exit(1)
128
jadmanski0afbb632008-06-06 21:10:57 +0000129 global RESULTS_DIR
130 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000131
mbligh83c1e9e2009-05-01 23:10:41 +0000132 site_init = utils.import_site_function(__file__,
133 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
134 _site_init_monitor_db_dummy)
135 site_init()
136
showardcca334f2009-03-12 20:38:34 +0000137 # Change the cwd while running to avoid issues incase we were launched from
138 # somewhere odd (such as a random NFS home directory of the person running
139 # sudo to launch us as the appropriate user).
140 os.chdir(RESULTS_DIR)
141
jamesrenc7d387e2010-08-10 21:48:30 +0000142 # This is helpful for debugging why stuff a scheduler launches is
143 # misbehaving.
144 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000145
jadmanski0afbb632008-06-06 21:10:57 +0000146 if options.test:
147 global _autoserv_path
148 _autoserv_path = 'autoserv_dummy'
149 global _testing_mode
150 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000151
jamesrenc44ae992010-02-19 00:12:54 +0000152 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000153 server.start()
154
jadmanski0afbb632008-06-06 21:10:57 +0000155 try:
jamesrenc44ae992010-02-19 00:12:54 +0000156 initialize()
showardc5afc462009-01-13 00:09:39 +0000157 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000158 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000159
Eric Lia82dc352011-02-23 13:15:52 -0800160 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000161 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000162 time.sleep(scheduler_config.config.tick_pause_sec)
Prashanth B4ec98672014-05-15 10:44:54 -0700163 except Exception:
showard170873e2009-01-07 00:22:26 +0000164 email_manager.manager.log_stacktrace(
165 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000166
showard170873e2009-01-07 00:22:26 +0000167 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000168 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000169 _drone_manager.shutdown()
Prashanth B0e960282014-05-13 19:38:28 -0700170 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000171
172
Prashanth B4ec98672014-05-15 10:44:54 -0700173def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000174 global _shutdown
175 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000176 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000177
178
jamesrenc44ae992010-02-19 00:12:54 +0000179def initialize():
showardb18134f2009-03-20 20:52:18 +0000180 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
181 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000182
showard8de37132009-08-31 18:33:08 +0000183 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000184 logging.critical("monitor_db already running, aborting!")
185 sys.exit(1)
186 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000187
showardb1e51872008-10-07 11:08:18 +0000188 if _testing_mode:
189 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700190 scheduler_lib.DB_CONFIG_SECTION, 'database',
191 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000192
jadmanski0afbb632008-06-06 21:10:57 +0000193 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700194 global _db_manager
195 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700196 global _db
197 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000198 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700199 signal.signal(signal.SIGINT, handle_signal)
200 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000201
jamesrenc44ae992010-02-19 00:12:54 +0000202 initialize_globals()
203 scheduler_models.initialize()
204
showardd1ee1dd2009-01-07 21:33:08 +0000205 drones = global_config.global_config.get_config_value(
206 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
207 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000208 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000209 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000210 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
211
showardb18134f2009-03-20 20:52:18 +0000212 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000213
214
jamesrenc44ae992010-02-19 00:12:54 +0000215def initialize_globals():
216 global _drone_manager
217 _drone_manager = drone_manager.instance()
218
219
showarded2afea2009-07-07 20:54:07 +0000220def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
221 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000222 """
223 @returns The autoserv command line as a list of executable + parameters.
224
225 @param machines - string - A machine or comma separated list of machines
226 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000227 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700228 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
229 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000230 @param queue_entry - A HostQueueEntry object - If supplied and no Job
231 object was supplied, this will be used to lookup the Job object.
232 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700233 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
234 machines, results_directory=drone_manager.WORKING_DIRECTORY,
235 extra_args=extra_args, job=job, queue_entry=queue_entry,
236 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000237
238
Simran Basia858a232012-08-21 11:04:37 -0700239class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800240
241
jadmanski0afbb632008-06-06 21:10:57 +0000242 def __init__(self):
243 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000244 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700245 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000246 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700247 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700248 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Jakob Jülich36accc62014-07-23 10:26:55 -0700249 _db)
showard170873e2009-01-07 00:22:26 +0000250 self._host_agents = {}
251 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000252 self._tick_count = 0
253 self._last_garbage_stats_time = time.time()
254 self._seconds_between_garbage_stats = 60 * (
255 global_config.global_config.get_config_value(
256 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700257 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700258 self._tick_debug = global_config.global_config.get_config_value(
259 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
260 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700261 self._extra_debugging = global_config.global_config.get_config_value(
262 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
263 default=False)
mbligh36768f02008-02-22 18:28:33 +0000264
Prashanth Bf66d51b2014-05-06 12:42:25 -0700265 # If _inline_host_acquisition is set the scheduler will acquire and
266 # release hosts against jobs inline, with the tick. Otherwise the
267 # scheduler will only focus on jobs that already have hosts, and
268 # will not explicitly unlease a host when a job finishes using it.
269 self._job_query_manager = query_managers.AFEJobQueryManager()
270 self._host_scheduler = (host_scheduler.BaseHostScheduler()
271 if _inline_host_acquisition else
272 host_scheduler.DummyHostScheduler())
273
mbligh36768f02008-02-22 18:28:33 +0000274
showard915958d2009-04-22 21:00:58 +0000275 def initialize(self, recover_hosts=True):
276 self._periodic_cleanup.initialize()
277 self._24hr_upkeep.initialize()
278
jadmanski0afbb632008-06-06 21:10:57 +0000279 # always recover processes
280 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000281
jadmanski0afbb632008-06-06 21:10:57 +0000282 if recover_hosts:
283 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000284
285
Simran Basi0ec94dd2012-08-28 09:50:10 -0700286 def _log_tick_msg(self, msg):
287 if self._tick_debug:
288 logging.debug(msg)
289
290
Simran Basidef92872012-09-20 13:34:34 -0700291 def _log_extra_msg(self, msg):
292 if self._extra_debugging:
293 logging.debug(msg)
294
295
jadmanski0afbb632008-06-06 21:10:57 +0000296 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700297 """
298 This is an altered version of tick() where we keep track of when each
299 major step begins so we can try to figure out where we are using most
300 of the tick time.
301 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700302 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700303 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000304 self._garbage_collection()
Prashanth B340fd1e2014-06-22 12:44:10 -0700305 self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
306 _drone_manager.trigger_refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700307 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000308 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700309 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000310 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700311 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000312 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700313 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000314 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700315 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000316 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700317 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000318 self._schedule_new_jobs()
Prashanth B340fd1e2014-06-22 12:44:10 -0700319 self._log_tick_msg('Calling _drone_manager.sync_refresh().')
320 _drone_manager.sync_refresh()
Prashanth B67548092014-07-11 18:46:01 -0700321 self._log_tick_msg('Calling _find_aborting().')
322 self._find_aborting()
323 self._log_tick_msg('Calling _find_aborted_special_tasks().')
324 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700325 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000326 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700327 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000328 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700329 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000330 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700331 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700332 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700333 with timer.get_client('email_manager_send_queued_emails'):
334 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700335 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700336 with timer.get_client('django_db_reset_queries'):
337 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000338 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000339
showard97aed502008-11-04 02:01:24 +0000340
mblighf3294cc2009-04-08 21:17:38 +0000341 def _run_cleanup(self):
342 self._periodic_cleanup.run_cleanup_maybe()
343 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000344
mbligh36768f02008-02-22 18:28:33 +0000345
showardf13a9e22009-12-18 22:54:09 +0000346 def _garbage_collection(self):
347 threshold_time = time.time() - self._seconds_between_garbage_stats
348 if threshold_time < self._last_garbage_stats_time:
349 # Don't generate these reports very often.
350 return
351
352 self._last_garbage_stats_time = time.time()
353 # Force a full level 0 collection (because we can, it doesn't hurt
354 # at this interval).
355 gc.collect()
356 logging.info('Logging garbage collector stats on tick %d.',
357 self._tick_count)
358 gc_stats._log_garbage_collector_stats()
359
360
showard170873e2009-01-07 00:22:26 +0000361 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
362 for object_id in object_ids:
363 agent_dict.setdefault(object_id, set()).add(agent)
364
365
366 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
367 for object_id in object_ids:
368 assert object_id in agent_dict
369 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700370 # If an ID has no more active agent associated, there is no need to
371 # keep it in the dictionary. Otherwise, scheduler will keep an
372 # unnecessarily big dictionary until being restarted.
373 if not agent_dict[object_id]:
374 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000375
376
showardd1195652009-12-08 22:21:02 +0000377 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700378 """
379 Creates and adds an agent to the dispatchers list.
380
381 In creating the agent we also pass on all the queue_entry_ids and
382 host_ids from the special agent task. For every agent we create, we
383 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
384 against the host_ids given to it. So theoritically, a host can have any
385 number of agents associated with it, and each of them can have any
386 special agent task, though in practice we never see > 1 agent/task per
387 host at any time.
388
389 @param agent_task: A SpecialTask for the agent to manage.
390 """
showardd1195652009-12-08 22:21:02 +0000391 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000392 self._agents.append(agent)
393 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000394 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
395 self._register_agent_for_ids(self._queue_entry_agents,
396 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000397
showard170873e2009-01-07 00:22:26 +0000398
399 def get_agents_for_entry(self, queue_entry):
400 """
401 Find agents corresponding to the specified queue_entry.
402 """
showardd3dc1992009-04-22 21:01:40 +0000403 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000404
405
406 def host_has_agent(self, host):
407 """
408 Determine if there is currently an Agent present using this host.
409 """
410 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000411
412
jadmanski0afbb632008-06-06 21:10:57 +0000413 def remove_agent(self, agent):
414 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000415 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
416 agent)
417 self._unregister_agent_for_ids(self._queue_entry_agents,
418 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000419
420
showard8cc058f2009-09-08 16:26:33 +0000421 def _host_has_scheduled_special_task(self, host):
422 return bool(models.SpecialTask.objects.filter(host__id=host.id,
423 is_active=False,
424 is_complete=False))
425
426
jadmanski0afbb632008-06-06 21:10:57 +0000427 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000428 agent_tasks = self._create_recovery_agent_tasks()
429 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000430 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000431 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000432 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000433 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000434 self._reverify_remaining_hosts()
435 # reinitialize drones after killing orphaned processes, since they can
436 # leave around files when they die
437 _drone_manager.execute_actions()
438 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000439
showard170873e2009-01-07 00:22:26 +0000440
showardd1195652009-12-08 22:21:02 +0000441 def _create_recovery_agent_tasks(self):
442 return (self._get_queue_entry_agent_tasks()
443 + self._get_special_task_agent_tasks(is_active=True))
444
445
446 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700447 """
448 Get agent tasks for all hqe in the specified states.
449
450 Loosely this translates to taking a hqe in one of the specified states,
451 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
452 through _get_agent_task_for_queue_entry. Each queue entry can only have
453 one agent task at a time, but there might be multiple queue entries in
454 the group.
455
456 @return: A list of AgentTasks.
457 """
showardd1195652009-12-08 22:21:02 +0000458 # host queue entry statuses handled directly by AgentTasks (Verifying is
459 # handled through SpecialTasks, so is not listed here)
460 statuses = (models.HostQueueEntry.Status.STARTING,
461 models.HostQueueEntry.Status.RUNNING,
462 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000463 models.HostQueueEntry.Status.PARSING,
464 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000465 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000466 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000467 where='status IN (%s)' % status_list)
Alex Miller47cd2472013-11-25 15:20:04 -0800468 stats.Gauge('scheduler.jobs_per_tick').send(
469 'running', len(queue_entries))
showardd1195652009-12-08 22:21:02 +0000470
471 agent_tasks = []
472 used_queue_entries = set()
473 for entry in queue_entries:
474 if self.get_agents_for_entry(entry):
475 # already being handled
476 continue
477 if entry in used_queue_entries:
478 # already picked up by a synchronous job
479 continue
480 agent_task = self._get_agent_task_for_queue_entry(entry)
481 agent_tasks.append(agent_task)
482 used_queue_entries.update(agent_task.queue_entries)
483 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000484
485
showardd1195652009-12-08 22:21:02 +0000486 def _get_special_task_agent_tasks(self, is_active=False):
487 special_tasks = models.SpecialTask.objects.filter(
488 is_active=is_active, is_complete=False)
489 return [self._get_agent_task_for_special_task(task)
490 for task in special_tasks]
491
492
493 def _get_agent_task_for_queue_entry(self, queue_entry):
494 """
beeps8bb1f7d2013-08-05 01:30:09 -0700495 Construct an AgentTask instance for the given active HostQueueEntry.
496
showardd1195652009-12-08 22:21:02 +0000497 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700498 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000499 """
500 task_entries = queue_entry.job.get_group_entries(queue_entry)
501 self._check_for_duplicate_host_entries(task_entries)
502
503 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
504 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000505 if queue_entry.is_hostless():
506 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000507 return QueueTask(queue_entries=task_entries)
508 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700509 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000510 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700511 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000512 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700513 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000514
Prashanth B0e960282014-05-13 19:38:28 -0700515 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800516 '_get_agent_task_for_queue_entry got entry with '
517 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000518
519
520 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000521 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
522 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000523 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000524 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000525 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000526 if using_host:
showardd1195652009-12-08 22:21:02 +0000527 self._assert_host_has_no_agent(task_entry)
528
529
530 def _assert_host_has_no_agent(self, entry):
531 """
532 @param entry: a HostQueueEntry or a SpecialTask
533 """
534 if self.host_has_agent(entry.host):
535 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700536 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000537 'While scheduling %s, host %s already has a host agent %s'
538 % (entry, entry.host, agent.task))
539
540
541 def _get_agent_task_for_special_task(self, special_task):
542 """
543 Construct an AgentTask class to run the given SpecialTask and add it
544 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700545
546 A special task is create through schedule_special_tasks, but only if
547 the host doesn't already have an agent. This happens through
548 add_agent_task. All special agent tasks are given a host on creation,
549 and a Null hqe. To create a SpecialAgentTask object, you need a
550 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
551 object contains a hqe it's passed on to the special agent task, which
552 creates a HostQueueEntry and saves it as it's queue_entry.
553
showardd1195652009-12-08 22:21:02 +0000554 @param special_task: a models.SpecialTask instance
555 @returns an AgentTask to run this SpecialTask
556 """
557 self._assert_host_has_no_agent(special_task)
558
beeps5e2bb4a2013-10-28 11:26:45 -0700559 special_agent_task_classes = (prejob_task.CleanupTask,
560 prejob_task.VerifyTask,
561 prejob_task.RepairTask,
562 prejob_task.ResetTask,
563 prejob_task.ProvisionTask)
564
showardd1195652009-12-08 22:21:02 +0000565 for agent_task_class in special_agent_task_classes:
566 if agent_task_class.TASK_TYPE == special_task.task:
567 return agent_task_class(task=special_task)
568
Prashanth B0e960282014-05-13 19:38:28 -0700569 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800570 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000571
572
573 def _register_pidfiles(self, agent_tasks):
574 for agent_task in agent_tasks:
575 agent_task.register_necessary_pidfiles()
576
577
578 def _recover_tasks(self, agent_tasks):
579 orphans = _drone_manager.get_orphaned_autoserv_processes()
580
581 for agent_task in agent_tasks:
582 agent_task.recover()
583 if agent_task.monitor and agent_task.monitor.has_process():
584 orphans.discard(agent_task.monitor.get_process())
585 self.add_agent_task(agent_task)
586
587 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000588
589
showard8cc058f2009-09-08 16:26:33 +0000590 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000591 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
592 % status):
showard0db3d432009-10-12 20:29:15 +0000593 if entry.status == status and not self.get_agents_for_entry(entry):
594 # The status can change during iteration, e.g., if job.run()
595 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000596 yield entry
597
598
showard6878e8b2009-07-20 22:37:45 +0000599 def _check_for_remaining_orphan_processes(self, orphans):
600 if not orphans:
601 return
602 subject = 'Unrecovered orphan autoserv processes remain'
603 message = '\n'.join(str(process) for process in orphans)
604 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000605
606 die_on_orphans = global_config.global_config.get_config_value(
607 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
608
609 if die_on_orphans:
610 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000611
showard170873e2009-01-07 00:22:26 +0000612
showard8cc058f2009-09-08 16:26:33 +0000613 def _recover_pending_entries(self):
614 for entry in self._get_unassigned_entries(
615 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000616 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000617 entry.on_pending()
618
619
showardb8900452009-10-12 20:31:01 +0000620 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000621 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000622 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
623 unrecovered_hqes = []
624 for queue_entry in queue_entries:
625 special_tasks = models.SpecialTask.objects.filter(
626 task__in=(models.SpecialTask.Task.CLEANUP,
627 models.SpecialTask.Task.VERIFY),
628 queue_entry__id=queue_entry.id,
629 is_complete=False)
630 if special_tasks.count() == 0:
631 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000632
showardb8900452009-10-12 20:31:01 +0000633 if unrecovered_hqes:
634 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700635 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000636 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000637 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000638
639
showard65db3932009-10-28 19:54:35 +0000640 def _schedule_special_tasks(self):
641 """
642 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700643
644 Special tasks include PreJobTasks like verify, reset and cleanup.
645 They are created through _schedule_new_jobs and associated with a hqe
646 This method translates SpecialTasks to the appropriate AgentTask and
647 adds them to the dispatchers agents list, so _handle_agents can execute
648 them.
showard65db3932009-10-28 19:54:35 +0000649 """
Prashanth B4ec98672014-05-15 10:44:54 -0700650 # When the host scheduler is responsible for acquisition we only want
651 # to run tasks with leased hosts. All hqe tasks will already have
652 # leased hosts, and we don't want to run frontend tasks till the host
653 # scheduler has vetted the assignment. Note that this doesn't include
654 # frontend tasks with hosts leased by other active hqes.
655 for task in self._job_query_manager.get_prioritized_special_tasks(
656 only_tasks_with_leased_hosts=not _inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000657 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000658 continue
showardd1195652009-12-08 22:21:02 +0000659 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000660
661
showard170873e2009-01-07 00:22:26 +0000662 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000663 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000664 # should never happen
showarded2afea2009-07-07 20:54:07 +0000665 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000666 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000667 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700668 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000669 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000670
671
jadmanski0afbb632008-06-06 21:10:57 +0000672 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000673 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700674 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000675 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000676 if self.host_has_agent(host):
677 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000678 continue
showard8cc058f2009-09-08 16:26:33 +0000679 if self._host_has_scheduled_special_task(host):
680 # host will have a special task scheduled on the next cycle
681 continue
showard170873e2009-01-07 00:22:26 +0000682 if print_message:
showardb18134f2009-03-20 20:52:18 +0000683 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000684 models.SpecialTask.objects.create(
685 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000686 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000687
688
jadmanski0afbb632008-06-06 21:10:57 +0000689 def _recover_hosts(self):
690 # recover "Repair Failed" hosts
691 message = 'Reverifying dead host %s'
692 self._reverify_hosts_where("status = 'Repair Failed'",
693 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000694
695
showard89f84db2009-03-12 20:39:13 +0000696 def _refresh_pending_queue_entries(self):
697 """
698 Lookup the pending HostQueueEntries and call our HostScheduler
699 refresh() method given that list. Return the list.
700
701 @returns A list of pending HostQueueEntries sorted in priority order.
702 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700703 queue_entries = self._job_query_manager.get_pending_queue_entries(
704 only_hostless=not _inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000705 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000706 return []
showard89f84db2009-03-12 20:39:13 +0000707 return queue_entries
708
709
showarda9545c02009-12-18 22:44:26 +0000710 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800711 """Schedule a hostless (suite) job.
712
713 @param queue_entry: The queue_entry representing the hostless job.
714 """
showarda9545c02009-12-18 22:44:26 +0000715 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000716 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000717
718
beepscc9fc702013-12-02 12:45:38 -0800719 def _schedule_host_job(self, host, queue_entry):
720 """Schedules a job on the given host.
721
722 1. Assign the host to the hqe, if it isn't already assigned.
723 2. Create a SpecialAgentTask for the hqe.
724 3. Activate the hqe.
725
726 @param queue_entry: The job to schedule.
727 @param host: The host to schedule the job on.
728 """
729 if self.host_has_agent(host):
730 host_agent_task = list(self._host_agents.get(host.id))[0].task
731 subject = 'Host with agents assigned to an HQE'
732 message = ('HQE: %s assigned host %s, but the host has '
733 'agent: %s for queue_entry %s. The HQE '
beepsf7d35162014-02-13 16:42:13 -0800734 'will have to try and acquire a host next tick ' %
beepscc9fc702013-12-02 12:45:38 -0800735 (queue_entry, host.hostname, host_agent_task,
736 host_agent_task.queue_entry))
737 email_manager.manager.enqueue_notify_email(subject, message)
beepscc9fc702013-12-02 12:45:38 -0800738 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700739 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800740
741
showard89f84db2009-03-12 20:39:13 +0000742 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700743 """
744 Find any new HQEs and call schedule_pre_job_tasks for it.
745
746 This involves setting the status of the HQE and creating a row in the
747 db corresponding the the special task, through
748 scheduler_models._queue_special_task. The new db row is then added as
749 an agent to the dispatcher through _schedule_special_tasks and
750 scheduled for execution on the drone through _handle_agents.
751 """
showard89f84db2009-03-12 20:39:13 +0000752 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000753
beepscc9fc702013-12-02 12:45:38 -0800754 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700755 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700756 new_jobs_with_hosts = 0
757 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800758 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700759 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000760
beepscc9fc702013-12-02 12:45:38 -0800761 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000762 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000763 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700764 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000765 else:
beepscc9fc702013-12-02 12:45:38 -0800766 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700767 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700768
beepsb255fc52013-10-13 23:28:54 -0700769 stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800770 if not host_jobs:
771 return
Prashanth Bf66d51b2014-05-06 12:42:25 -0700772 if not _inline_host_acquisition:
773 message = ('Found %s jobs that need hosts though '
774 '_inline_host_acquisition=%s. Will acquire hosts.' %
775 ([str(job) for job in host_jobs],
776 _inline_host_acquisition))
777 email_manager.manager.enqueue_notify_email(
778 'Processing unexpected host acquisition requests', message)
779 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
780 for host_assignment in jobs_with_hosts:
781 self._schedule_host_job(host_assignment.host, host_assignment.job)
782 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800783
beepsb255fc52013-10-13 23:28:54 -0700784 stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
785 stats.Gauge(key).send('new_jobs_without_hosts',
786 new_jobs_need_hosts - new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000787
788
showard8cc058f2009-09-08 16:26:33 +0000789 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700790 """
791 Adds agents to the dispatcher.
792
793 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
794 QueueTask for example, will have a job with a control file, and
795 the agent will have methods that poll, abort and check if the queue
796 task is finished. The dispatcher runs the agent_task, as well as
797 other agents in it's _agents member, through _handle_agents, by
798 calling the Agents tick().
799
800 This method creates an agent for each HQE in one of (starting, running,
801 gathering, parsing, archiving) states, and adds it to the dispatcher so
802 it is handled by _handle_agents.
803 """
showardd1195652009-12-08 22:21:02 +0000804 for agent_task in self._get_queue_entry_agent_tasks():
805 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000806
807
808 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000809 for entry in scheduler_models.HostQueueEntry.fetch(
810 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000811 task = entry.job.schedule_delayed_callback_task(entry)
812 if task:
showardd1195652009-12-08 22:21:02 +0000813 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000814
815
jadmanski0afbb632008-06-06 21:10:57 +0000816 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700817 """
818 Looks through the afe_host_queue_entries for an aborted entry.
819
820 The aborted bit is set on an HQE in many ways, the most common
821 being when a user requests an abort through the frontend, which
822 results in an rpc from the afe to abort_host_queue_entries.
823 """
jamesrene7c65cb2010-06-08 20:38:10 +0000824 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000825 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700826 where='aborted=1 and complete=0'):
showardf4a2e502009-07-28 20:06:39 +0000827 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800828
829 # The task would have started off with both is_complete and
830 # is_active = False. Aborted tasks are neither active nor complete.
831 # For all currently active tasks this will happen through the agent,
832 # but we need to manually update the special tasks that haven't
833 # started yet, because they don't have agents.
834 models.SpecialTask.objects.filter(is_active=False,
835 queue_entry_id=entry.id).update(is_complete=True)
836
showardd3dc1992009-04-22 21:01:40 +0000837 for agent in self.get_agents_for_entry(entry):
838 agent.abort()
839 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000840 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700841 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000842 for job in jobs_to_stop:
843 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000844
845
beeps8bb1f7d2013-08-05 01:30:09 -0700846 def _find_aborted_special_tasks(self):
847 """
848 Find SpecialTasks that have been marked for abortion.
849
850 Poll the database looking for SpecialTasks that are active
851 and have been marked for abortion, then abort them.
852 """
853
854 # The completed and active bits are very important when it comes
855 # to scheduler correctness. The active bit is set through the prolog
856 # of a special task, and reset through the cleanup method of the
857 # SpecialAgentTask. The cleanup is called both through the abort and
858 # epilog. The complete bit is set in several places, and in general
859 # a hanging job will have is_active=1 is_complete=0, while a special
860 # task which completed will have is_active=0 is_complete=1. To check
861 # aborts we directly check active because the complete bit is set in
862 # several places, including the epilog of agent tasks.
863 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
864 is_aborted=True)
865 for task in aborted_tasks:
866 # There are 2 ways to get the agent associated with a task,
867 # through the host and through the hqe. A special task
868 # always needs a host, but doesn't always need a hqe.
869 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700870 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000871
beeps8bb1f7d2013-08-05 01:30:09 -0700872 # The epilog preforms critical actions such as
873 # queueing the next SpecialTask, requeuing the
874 # hqe etc, however it doesn't actually kill the
875 # monitor process and set the 'done' bit. Epilogs
876 # assume that the job failed, and that the monitor
877 # process has already written an exit code. The
878 # done bit is a necessary condition for
879 # _handle_agents to schedule any more special
880 # tasks against the host, and it must be set
881 # in addition to is_active, is_complete and success.
882 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000883 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700884
885
showard324bf812009-01-20 23:23:38 +0000886 def _can_start_agent(self, agent, num_started_this_cycle,
887 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000888 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000889 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000890 return True
891 # don't allow any nonzero-process agents to run after we've reached a
892 # limit (this avoids starvation of many-process agents)
893 if have_reached_limit:
894 return False
895 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000896 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000897 agent.task.owner_username,
898 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000899 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000900 return False
901 # if a single agent exceeds the per-cycle throttling, still allow it to
902 # run when it's the first agent in the cycle
903 if num_started_this_cycle == 0:
904 return True
905 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000906 if (num_started_this_cycle + agent.task.num_processes >
907 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000908 return False
909 return True
910
911
jadmanski0afbb632008-06-06 21:10:57 +0000912 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700913 """
914 Handles agents of the dispatcher.
915
916 Appropriate Agents are added to the dispatcher through
917 _schedule_running_host_queue_entries. These agents each
918 have a task. This method runs the agents task through
919 agent.tick() leading to:
920 agent.start
921 prolog -> AgentTasks prolog
922 For each queue entry:
923 sets host status/status to Running
924 set started_on in afe_host_queue_entries
925 run -> AgentTasks run
926 Creates PidfileRunMonitor
927 Queues the autoserv command line for this AgentTask
928 via the drone manager. These commands are executed
929 through the drone managers execute actions.
930 poll -> AgentTasks/BaseAgentTask poll
931 checks the monitors exit_code.
932 Executes epilog if task is finished.
933 Executes AgentTasks _finish_task
934 finish_task is usually responsible for setting the status
935 of the HQE/host, and updating it's active and complete fileds.
936
937 agent.is_done
938 Removed the agent from the dispatchers _agents queue.
939 Is_done checks the finished bit on the agent, that is
940 set based on the Agents task. During the agents poll
941 we check to see if the monitor process has exited in
942 it's finish method, and set the success member of the
943 task based on this exit code.
944 """
jadmanski0afbb632008-06-06 21:10:57 +0000945 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000946 have_reached_limit = False
947 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700948 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000949 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700950 self._log_extra_msg('Processing Agent with Host Ids: %s and '
951 'queue_entry ids:%s' % (agent.host_ids,
952 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000953 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000954 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000955 have_reached_limit):
956 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700957 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000958 continue
showardd1195652009-12-08 22:21:02 +0000959 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700960 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000961 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700962 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000963 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700964 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000965 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700966 logging.info('%d running processes. %d added this cycle.',
967 _drone_manager.total_running_processes(),
968 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +0000969
970
showard29f7cd22009-04-29 21:16:24 +0000971 def _process_recurring_runs(self):
972 recurring_runs = models.RecurringRun.objects.filter(
973 start_date__lte=datetime.datetime.now())
974 for rrun in recurring_runs:
975 # Create job from template
976 job = rrun.job
977 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000978 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000979
980 host_objects = info['hosts']
981 one_time_hosts = info['one_time_hosts']
982 metahost_objects = info['meta_hosts']
983 dependencies = info['dependencies']
984 atomic_group = info['atomic_group']
985
986 for host in one_time_hosts or []:
987 this_host = models.Host.create_one_time_host(host.hostname)
988 host_objects.append(this_host)
989
990 try:
991 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000992 options=options,
showard29f7cd22009-04-29 21:16:24 +0000993 host_objects=host_objects,
994 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +0000995 atomic_group=atomic_group)
996
997 except Exception, ex:
998 logging.exception(ex)
999 #TODO send email
1000
1001 if rrun.loop_count == 1:
1002 rrun.delete()
1003 else:
1004 if rrun.loop_count != 0: # if not infinite loop
1005 # calculate new start_date
1006 difference = datetime.timedelta(seconds=rrun.loop_period)
1007 rrun.start_date = rrun.start_date + difference
1008 rrun.loop_count -= 1
1009 rrun.save()
1010
1011
Simran Basia858a232012-08-21 11:04:37 -07001012SiteDispatcher = utils.import_site_class(
1013 __file__, 'autotest_lib.scheduler.site_monitor_db',
1014 'SiteDispatcher', BaseDispatcher)
1015
1016class Dispatcher(SiteDispatcher):
1017 pass
1018
1019
mbligh36768f02008-02-22 18:28:33 +00001020class Agent(object):
showard77182562009-06-10 00:16:05 +00001021 """
Alex Miller47715eb2013-07-24 03:34:01 -07001022 An agent for use by the Dispatcher class to perform a task. An agent wraps
1023 around an AgentTask mainly to associate the AgentTask with the queue_entry
1024 and host ids.
showard77182562009-06-10 00:16:05 +00001025
1026 The following methods are required on all task objects:
1027 poll() - Called periodically to let the task check its status and
1028 update its internal state. If the task succeeded.
1029 is_done() - Returns True if the task is finished.
1030 abort() - Called when an abort has been requested. The task must
1031 set its aborted attribute to True if it actually aborted.
1032
1033 The following attributes are required on all task objects:
1034 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001035 success - bool, True if this task succeeded.
1036 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1037 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001038 """
1039
1040
showard418785b2009-11-23 20:19:59 +00001041 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001042 """
Alex Miller47715eb2013-07-24 03:34:01 -07001043 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001044 """
showard8cc058f2009-09-08 16:26:33 +00001045 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001046
showard77182562009-06-10 00:16:05 +00001047 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001048 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001049
showard8cc058f2009-09-08 16:26:33 +00001050 self.queue_entry_ids = task.queue_entry_ids
1051 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001052
showard8cc058f2009-09-08 16:26:33 +00001053 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001054 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001055
1056
jadmanski0afbb632008-06-06 21:10:57 +00001057 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001058 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001059 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001060 self.task.poll()
1061 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001062 self.finished = True
showardec113162008-05-08 00:52:49 +00001063
1064
jadmanski0afbb632008-06-06 21:10:57 +00001065 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001066 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001067
1068
showardd3dc1992009-04-22 21:01:40 +00001069 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001070 if self.task:
1071 self.task.abort()
1072 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001073 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001074 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001075
showardd3dc1992009-04-22 21:01:40 +00001076
beeps5e2bb4a2013-10-28 11:26:45 -07001077class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001078 """
1079 Common functionality for QueueTask and HostlessQueueTask
1080 """
1081 def __init__(self, queue_entries):
1082 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001083 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001084 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001085
1086
showard73ec0442009-02-07 02:05:20 +00001087 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001088 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001089
1090
jamesrenc44ae992010-02-19 00:12:54 +00001091 def _write_control_file(self, execution_path):
1092 control_path = _drone_manager.attach_file_to_execution(
1093 execution_path, self.job.control_file)
1094 return control_path
1095
1096
Aviv Keshet308e7362013-05-21 14:43:16 -07001097 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001098 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001099 execution_path = self.queue_entries[0].execution_path()
1100 control_path = self._write_control_file(execution_path)
1101 hostnames = ','.join(entry.host.hostname
1102 for entry in self.queue_entries
1103 if not entry.is_hostless())
1104
1105 execution_tag = self.queue_entries[0].execution_tag()
1106 params = _autoserv_command_line(
1107 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001108 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001109 _drone_manager.absolute_path(control_path)],
1110 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001111 if self.job.is_image_update_job():
1112 params += ['--image', self.job.update_image_path]
1113
jamesrenc44ae992010-02-19 00:12:54 +00001114 return params
showardd1195652009-12-08 22:21:02 +00001115
1116
1117 @property
1118 def num_processes(self):
1119 return len(self.queue_entries)
1120
1121
1122 @property
1123 def owner_username(self):
1124 return self.job.owner
1125
1126
1127 def _working_directory(self):
1128 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001129
1130
jadmanski0afbb632008-06-06 21:10:57 +00001131 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001132 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001133 keyval_dict = self.job.keyval_dict()
1134 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001135 group_name = self.queue_entries[0].get_group_name()
1136 if group_name:
1137 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001138 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001139 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001140 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001141 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001142
1143
showard35162b02009-03-03 02:17:30 +00001144 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001145 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001146 _drone_manager.write_lines_to_file(error_file_path,
1147 [_LOST_PROCESS_ERROR])
1148
1149
showardd3dc1992009-04-22 21:01:40 +00001150 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001151 if not self.monitor:
1152 return
1153
showardd9205182009-04-27 20:09:55 +00001154 self._write_job_finished()
1155
showard35162b02009-03-03 02:17:30 +00001156 if self.monitor.lost_process:
1157 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001158
jadmanskif7fa2cc2008-10-01 14:13:23 +00001159
showardcbd74612008-11-19 21:42:02 +00001160 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001161 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001162 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001163 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001164 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001165
1166
jadmanskif7fa2cc2008-10-01 14:13:23 +00001167 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001168 if not self.monitor or not self.monitor.has_process():
1169 return
1170
jadmanskif7fa2cc2008-10-01 14:13:23 +00001171 # build up sets of all the aborted_by and aborted_on values
1172 aborted_by, aborted_on = set(), set()
1173 for queue_entry in self.queue_entries:
1174 if queue_entry.aborted_by:
1175 aborted_by.add(queue_entry.aborted_by)
1176 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1177 aborted_on.add(t)
1178
1179 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001180 # TODO(showard): this conditional is now obsolete, we just need to leave
1181 # it in temporarily for backwards compatibility over upgrades. delete
1182 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001183 assert len(aborted_by) <= 1
1184 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001185 aborted_by_value = aborted_by.pop()
1186 aborted_on_value = max(aborted_on)
1187 else:
1188 aborted_by_value = 'autotest_system'
1189 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001190
showarda0382352009-02-11 23:36:43 +00001191 self._write_keyval_after_job("aborted_by", aborted_by_value)
1192 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001193
showardcbd74612008-11-19 21:42:02 +00001194 aborted_on_string = str(datetime.datetime.fromtimestamp(
1195 aborted_on_value))
1196 self._write_status_comment('Job aborted by %s on %s' %
1197 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001198
1199
jadmanski0afbb632008-06-06 21:10:57 +00001200 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001201 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001202 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001203 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001204
1205
jadmanski0afbb632008-06-06 21:10:57 +00001206 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001207 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001208 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001209
1210
1211class QueueTask(AbstractQueueTask):
1212 def __init__(self, queue_entries):
1213 super(QueueTask, self).__init__(queue_entries)
1214 self._set_ids(queue_entries=queue_entries)
1215
1216
1217 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001218 self._check_queue_entry_statuses(
1219 self.queue_entries,
1220 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1221 models.HostQueueEntry.Status.RUNNING),
1222 allowed_host_statuses=(models.Host.Status.PENDING,
1223 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001224
1225 super(QueueTask, self).prolog()
1226
1227 for queue_entry in self.queue_entries:
1228 self._write_host_keyvals(queue_entry.host)
1229 queue_entry.host.set_status(models.Host.Status.RUNNING)
1230 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001231
1232
1233 def _finish_task(self):
1234 super(QueueTask, self)._finish_task()
1235
1236 for queue_entry in self.queue_entries:
1237 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001238 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001239
1240
Alex Miller9f01d5d2013-08-08 02:26:01 -07001241 def _command_line(self):
1242 invocation = super(QueueTask, self)._command_line()
1243 return invocation + ['--verify_job_repo_url']
1244
1245
Dan Shi1a189052013-10-28 14:41:35 -07001246class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001247 def __init__(self, queue_entry):
1248 super(HostlessQueueTask, self).__init__([queue_entry])
1249 self.queue_entry_ids = [queue_entry.id]
1250
1251
1252 def prolog(self):
1253 self.queue_entries[0].update_field('execution_subdir', 'hostless')
1254 super(HostlessQueueTask, self).prolog()
1255
1256
mbligh4608b002010-01-05 18:22:35 +00001257 def _finish_task(self):
1258 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001259
1260 # When a job is added to database, its initial status is always
1261 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1262 # status, check if any of them can be started. If scheduler hits some
Alex Millerac189f32014-06-23 13:55:23 -07001263 # limit, e.g., max_hostless_jobs_per_drone,
1264 # max_processes_started_per_cycle, scheduler will leave these jobs in
1265 # Starting status. Otherwise, the jobs' status will be changed to
1266 # Running, and an autoserv process will be started in drone for each of
1267 # these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001268 # If the entry is still in status Starting, the process has not started
1269 # yet. Therefore, there is no need to parse and collect log. Without
1270 # this check, exception will be raised by scheduler as execution_subdir
1271 # for this queue entry does not have a value yet.
1272 hqe = self.queue_entries[0]
1273 if hqe.status != models.HostQueueEntry.Status.STARTING:
1274 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001275
1276
mbligh36768f02008-02-22 18:28:33 +00001277if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001278 main()