blob: 2e4f7e58bee30000d7f2aea27a83ce8d6d644e06 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
Dan Shif6c65bd2014-08-29 16:15:07 -07008import datetime
9import gc
10import logging
11import optparse
12import os
13import signal
14import sys
15import time
showard402934a2009-12-21 22:20:47 +000016
Alex Miller05d7b4c2013-03-04 07:49:38 -080017import common
showard21baa452008-10-21 00:08:39 +000018from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000019
20import django.db
21
Prashanth B0e960282014-05-13 19:38:28 -070022from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070023from autotest_lib.client.common_lib import utils
Michael Liangda8c60a2014-06-03 13:24:51 -070024from autotest_lib.client.common_lib.cros.graphite import stats
Prashanth B0e960282014-05-13 19:38:28 -070025from autotest_lib.frontend.afe import models, rpc_utils
Fang Dengc330bee2014-10-21 18:10:55 -070026from autotest_lib.scheduler import agent_task, drone_manager
beeps5e2bb4a2013-10-28 11:26:45 -070027from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
28from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070029from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070030from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070031from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000032from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080033from autotest_lib.scheduler import status_server, scheduler_config
Prashanth Bf66d51b2014-05-06 12:42:25 -070034from autotest_lib.scheduler import scheduler_lib
Aviv Keshet308e7362013-05-21 14:43:16 -070035from autotest_lib.server import autoserv_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080036
showard549afad2009-08-20 23:33:36 +000037BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
38PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000039
mbligh36768f02008-02-22 18:28:33 +000040RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000041AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
42
43if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000044 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000045AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
46AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
47
48if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000049 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000050
showard35162b02009-03-03 02:17:30 +000051# error message to leave in results dir when an autoserv process disappears
52# mysteriously
53_LOST_PROCESS_ERROR = """\
54Autoserv failed abnormally during execution for this job, probably due to a
55system error on the Autotest server. Full results may not be available. Sorry.
56"""
57
Prashanth B0e960282014-05-13 19:38:28 -070058_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070059_db = None
mbligh36768f02008-02-22 18:28:33 +000060_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070061
62# These 2 globals are replaced for testing
63_autoserv_directory = autoserv_utils.autoserv_directory
64_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000065_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000066_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070067_inline_host_acquisition = global_config.global_config.get_config_value(
68 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
69 default=True)
70
mbligh36768f02008-02-22 18:28:33 +000071
mbligh83c1e9e2009-05-01 23:10:41 +000072def _site_init_monitor_db_dummy():
73 return {}
74
75
jamesren76fcf192010-04-21 20:39:50 +000076def _verify_default_drone_set_exists():
77 if (models.DroneSet.drone_sets_enabled() and
78 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070079 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080080 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000081
82
83def _sanity_check():
84 """Make sure the configs are consistent before starting the scheduler"""
85 _verify_default_drone_set_exists()
86
87
mbligh36768f02008-02-22 18:28:33 +000088def main():
showard27f33872009-04-07 18:20:53 +000089 try:
showard549afad2009-08-20 23:33:36 +000090 try:
91 main_without_exception_handling()
92 except SystemExit:
93 raise
94 except:
95 logging.exception('Exception escaping in monitor_db')
96 raise
97 finally:
98 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000099
100
101def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700102 scheduler_lib.setup_logging(
103 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
104 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000105 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000106 parser = optparse.OptionParser(usage)
107 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
108 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000109 parser.add_option('--test', help='Indicate that scheduler is under ' +
110 'test and should use dummy autoserv and no parsing',
111 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700112 parser.add_option('--production',
113 help=('Indicate that scheduler is running in production '
114 'environment and it can use database that is not '
115 'hosted in localhost. If it is set to False, '
116 'scheduler will fail if database is not in '
117 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700118 action='store_true', default=False)
jadmanski0afbb632008-06-06 21:10:57 +0000119 (options, args) = parser.parse_args()
120 if len(args) != 1:
121 parser.print_usage()
122 return
mbligh36768f02008-02-22 18:28:33 +0000123
Dan Shif6c65bd2014-08-29 16:15:07 -0700124 scheduler_lib.check_production_settings(options)
125
showard5613c662009-06-08 23:30:33 +0000126 scheduler_enabled = global_config.global_config.get_config_value(
127 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
128
129 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800130 logging.error("Scheduler not enabled, set enable_scheduler to true in "
131 "the global_config's SCHEDULER section to enable it. "
132 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000133 sys.exit(1)
134
jadmanski0afbb632008-06-06 21:10:57 +0000135 global RESULTS_DIR
136 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000137
mbligh83c1e9e2009-05-01 23:10:41 +0000138 site_init = utils.import_site_function(__file__,
139 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
140 _site_init_monitor_db_dummy)
141 site_init()
142
showardcca334f2009-03-12 20:38:34 +0000143 # Change the cwd while running to avoid issues incase we were launched from
144 # somewhere odd (such as a random NFS home directory of the person running
145 # sudo to launch us as the appropriate user).
146 os.chdir(RESULTS_DIR)
147
jamesrenc7d387e2010-08-10 21:48:30 +0000148 # This is helpful for debugging why stuff a scheduler launches is
149 # misbehaving.
150 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000151
jadmanski0afbb632008-06-06 21:10:57 +0000152 if options.test:
153 global _autoserv_path
154 _autoserv_path = 'autoserv_dummy'
155 global _testing_mode
156 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000157
jamesrenc44ae992010-02-19 00:12:54 +0000158 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000159 server.start()
160
jadmanski0afbb632008-06-06 21:10:57 +0000161 try:
jamesrenc44ae992010-02-19 00:12:54 +0000162 initialize()
showardc5afc462009-01-13 00:09:39 +0000163 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000164 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000165
Eric Lia82dc352011-02-23 13:15:52 -0800166 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000167 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000168 time.sleep(scheduler_config.config.tick_pause_sec)
Prashanth B4ec98672014-05-15 10:44:54 -0700169 except Exception:
showard170873e2009-01-07 00:22:26 +0000170 email_manager.manager.log_stacktrace(
171 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000172
showard170873e2009-01-07 00:22:26 +0000173 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000174 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000175 _drone_manager.shutdown()
Prashanth B0e960282014-05-13 19:38:28 -0700176 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000177
178
Prashanth B4ec98672014-05-15 10:44:54 -0700179def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000180 global _shutdown
181 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000182 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000183
184
jamesrenc44ae992010-02-19 00:12:54 +0000185def initialize():
showardb18134f2009-03-20 20:52:18 +0000186 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
187 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000188
showard8de37132009-08-31 18:33:08 +0000189 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000190 logging.critical("monitor_db already running, aborting!")
191 sys.exit(1)
192 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000193
showardb1e51872008-10-07 11:08:18 +0000194 if _testing_mode:
195 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700196 scheduler_lib.DB_CONFIG_SECTION, 'database',
197 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000198
jadmanski0afbb632008-06-06 21:10:57 +0000199 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700200 global _db_manager
201 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700202 global _db
203 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000204 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700205 signal.signal(signal.SIGINT, handle_signal)
206 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000207
jamesrenc44ae992010-02-19 00:12:54 +0000208 initialize_globals()
209 scheduler_models.initialize()
210
showardd1ee1dd2009-01-07 21:33:08 +0000211 drones = global_config.global_config.get_config_value(
212 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
213 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000214 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000215 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000216 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
217
showardb18134f2009-03-20 20:52:18 +0000218 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000219
220
jamesrenc44ae992010-02-19 00:12:54 +0000221def initialize_globals():
222 global _drone_manager
223 _drone_manager = drone_manager.instance()
224
225
showarded2afea2009-07-07 20:54:07 +0000226def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
227 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000228 """
229 @returns The autoserv command line as a list of executable + parameters.
230
231 @param machines - string - A machine or comma separated list of machines
232 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000233 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700234 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
235 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000236 @param queue_entry - A HostQueueEntry object - If supplied and no Job
237 object was supplied, this will be used to lookup the Job object.
238 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700239 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
240 machines, results_directory=drone_manager.WORKING_DIRECTORY,
241 extra_args=extra_args, job=job, queue_entry=queue_entry,
242 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000243
244
Simran Basia858a232012-08-21 11:04:37 -0700245class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800246
247
jadmanski0afbb632008-06-06 21:10:57 +0000248 def __init__(self):
249 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000250 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700251 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000252 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700253 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700254 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Jakob Jülich36accc62014-07-23 10:26:55 -0700255 _db)
showard170873e2009-01-07 00:22:26 +0000256 self._host_agents = {}
257 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000258 self._tick_count = 0
259 self._last_garbage_stats_time = time.time()
260 self._seconds_between_garbage_stats = 60 * (
261 global_config.global_config.get_config_value(
262 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700263 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700264 self._tick_debug = global_config.global_config.get_config_value(
265 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
266 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700267 self._extra_debugging = global_config.global_config.get_config_value(
268 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
269 default=False)
mbligh36768f02008-02-22 18:28:33 +0000270
Prashanth Bf66d51b2014-05-06 12:42:25 -0700271 # If _inline_host_acquisition is set the scheduler will acquire and
272 # release hosts against jobs inline, with the tick. Otherwise the
273 # scheduler will only focus on jobs that already have hosts, and
274 # will not explicitly unlease a host when a job finishes using it.
275 self._job_query_manager = query_managers.AFEJobQueryManager()
276 self._host_scheduler = (host_scheduler.BaseHostScheduler()
277 if _inline_host_acquisition else
278 host_scheduler.DummyHostScheduler())
279
mbligh36768f02008-02-22 18:28:33 +0000280
showard915958d2009-04-22 21:00:58 +0000281 def initialize(self, recover_hosts=True):
282 self._periodic_cleanup.initialize()
283 self._24hr_upkeep.initialize()
284
jadmanski0afbb632008-06-06 21:10:57 +0000285 # always recover processes
286 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000287
jadmanski0afbb632008-06-06 21:10:57 +0000288 if recover_hosts:
289 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000290
291
Simran Basi0ec94dd2012-08-28 09:50:10 -0700292 def _log_tick_msg(self, msg):
293 if self._tick_debug:
294 logging.debug(msg)
295
296
Simran Basidef92872012-09-20 13:34:34 -0700297 def _log_extra_msg(self, msg):
298 if self._extra_debugging:
299 logging.debug(msg)
300
301
jadmanski0afbb632008-06-06 21:10:57 +0000302 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700303 """
304 This is an altered version of tick() where we keep track of when each
305 major step begins so we can try to figure out where we are using most
306 of the tick time.
307 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700308 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700309 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000310 self._garbage_collection()
Prashanth B340fd1e2014-06-22 12:44:10 -0700311 self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
312 _drone_manager.trigger_refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700313 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000314 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700315 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000316 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700317 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000318 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700319 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000320 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700321 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000322 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700323 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000324 self._schedule_new_jobs()
Prashanth B340fd1e2014-06-22 12:44:10 -0700325 self._log_tick_msg('Calling _drone_manager.sync_refresh().')
326 _drone_manager.sync_refresh()
Prashanth B67548092014-07-11 18:46:01 -0700327 self._log_tick_msg('Calling _find_aborting().')
328 self._find_aborting()
329 self._log_tick_msg('Calling _find_aborted_special_tasks().')
330 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700331 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000332 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700333 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000334 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700335 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000336 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700337 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700338 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700339 with timer.get_client('email_manager_send_queued_emails'):
340 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700341 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700342 with timer.get_client('django_db_reset_queries'):
343 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000344 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000345
showard97aed502008-11-04 02:01:24 +0000346
mblighf3294cc2009-04-08 21:17:38 +0000347 def _run_cleanup(self):
348 self._periodic_cleanup.run_cleanup_maybe()
349 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000350
mbligh36768f02008-02-22 18:28:33 +0000351
showardf13a9e22009-12-18 22:54:09 +0000352 def _garbage_collection(self):
353 threshold_time = time.time() - self._seconds_between_garbage_stats
354 if threshold_time < self._last_garbage_stats_time:
355 # Don't generate these reports very often.
356 return
357
358 self._last_garbage_stats_time = time.time()
359 # Force a full level 0 collection (because we can, it doesn't hurt
360 # at this interval).
361 gc.collect()
362 logging.info('Logging garbage collector stats on tick %d.',
363 self._tick_count)
364 gc_stats._log_garbage_collector_stats()
365
366
showard170873e2009-01-07 00:22:26 +0000367 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
368 for object_id in object_ids:
369 agent_dict.setdefault(object_id, set()).add(agent)
370
371
372 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
373 for object_id in object_ids:
374 assert object_id in agent_dict
375 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700376 # If an ID has no more active agent associated, there is no need to
377 # keep it in the dictionary. Otherwise, scheduler will keep an
378 # unnecessarily big dictionary until being restarted.
379 if not agent_dict[object_id]:
380 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000381
382
showardd1195652009-12-08 22:21:02 +0000383 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700384 """
385 Creates and adds an agent to the dispatchers list.
386
387 In creating the agent we also pass on all the queue_entry_ids and
388 host_ids from the special agent task. For every agent we create, we
389 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
390 against the host_ids given to it. So theoritically, a host can have any
391 number of agents associated with it, and each of them can have any
392 special agent task, though in practice we never see > 1 agent/task per
393 host at any time.
394
395 @param agent_task: A SpecialTask for the agent to manage.
396 """
showardd1195652009-12-08 22:21:02 +0000397 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000398 self._agents.append(agent)
399 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000400 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
401 self._register_agent_for_ids(self._queue_entry_agents,
402 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000403
showard170873e2009-01-07 00:22:26 +0000404
405 def get_agents_for_entry(self, queue_entry):
406 """
407 Find agents corresponding to the specified queue_entry.
408 """
showardd3dc1992009-04-22 21:01:40 +0000409 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000410
411
412 def host_has_agent(self, host):
413 """
414 Determine if there is currently an Agent present using this host.
415 """
416 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000417
418
jadmanski0afbb632008-06-06 21:10:57 +0000419 def remove_agent(self, agent):
420 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000421 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
422 agent)
423 self._unregister_agent_for_ids(self._queue_entry_agents,
424 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000425
426
showard8cc058f2009-09-08 16:26:33 +0000427 def _host_has_scheduled_special_task(self, host):
428 return bool(models.SpecialTask.objects.filter(host__id=host.id,
429 is_active=False,
430 is_complete=False))
431
432
jadmanski0afbb632008-06-06 21:10:57 +0000433 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000434 agent_tasks = self._create_recovery_agent_tasks()
435 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000436 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000437 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000438 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000439 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000440 self._reverify_remaining_hosts()
441 # reinitialize drones after killing orphaned processes, since they can
442 # leave around files when they die
443 _drone_manager.execute_actions()
444 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000445
showard170873e2009-01-07 00:22:26 +0000446
showardd1195652009-12-08 22:21:02 +0000447 def _create_recovery_agent_tasks(self):
448 return (self._get_queue_entry_agent_tasks()
449 + self._get_special_task_agent_tasks(is_active=True))
450
451
452 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700453 """
454 Get agent tasks for all hqe in the specified states.
455
456 Loosely this translates to taking a hqe in one of the specified states,
457 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
458 through _get_agent_task_for_queue_entry. Each queue entry can only have
459 one agent task at a time, but there might be multiple queue entries in
460 the group.
461
462 @return: A list of AgentTasks.
463 """
showardd1195652009-12-08 22:21:02 +0000464 # host queue entry statuses handled directly by AgentTasks (Verifying is
465 # handled through SpecialTasks, so is not listed here)
466 statuses = (models.HostQueueEntry.Status.STARTING,
467 models.HostQueueEntry.Status.RUNNING,
468 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000469 models.HostQueueEntry.Status.PARSING,
470 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000471 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000472 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000473 where='status IN (%s)' % status_list)
Alex Miller47cd2472013-11-25 15:20:04 -0800474 stats.Gauge('scheduler.jobs_per_tick').send(
475 'running', len(queue_entries))
showardd1195652009-12-08 22:21:02 +0000476
477 agent_tasks = []
478 used_queue_entries = set()
479 for entry in queue_entries:
480 if self.get_agents_for_entry(entry):
481 # already being handled
482 continue
483 if entry in used_queue_entries:
484 # already picked up by a synchronous job
485 continue
486 agent_task = self._get_agent_task_for_queue_entry(entry)
487 agent_tasks.append(agent_task)
488 used_queue_entries.update(agent_task.queue_entries)
489 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000490
491
showardd1195652009-12-08 22:21:02 +0000492 def _get_special_task_agent_tasks(self, is_active=False):
493 special_tasks = models.SpecialTask.objects.filter(
494 is_active=is_active, is_complete=False)
495 return [self._get_agent_task_for_special_task(task)
496 for task in special_tasks]
497
498
499 def _get_agent_task_for_queue_entry(self, queue_entry):
500 """
beeps8bb1f7d2013-08-05 01:30:09 -0700501 Construct an AgentTask instance for the given active HostQueueEntry.
502
showardd1195652009-12-08 22:21:02 +0000503 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700504 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000505 """
506 task_entries = queue_entry.job.get_group_entries(queue_entry)
507 self._check_for_duplicate_host_entries(task_entries)
508
509 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
510 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000511 if queue_entry.is_hostless():
512 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000513 return QueueTask(queue_entries=task_entries)
514 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700515 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000516 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700517 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000518 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700519 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000520
Prashanth B0e960282014-05-13 19:38:28 -0700521 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800522 '_get_agent_task_for_queue_entry got entry with '
523 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000524
525
526 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000527 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
528 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000529 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000530 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000531 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000532 if using_host:
showardd1195652009-12-08 22:21:02 +0000533 self._assert_host_has_no_agent(task_entry)
534
535
536 def _assert_host_has_no_agent(self, entry):
537 """
538 @param entry: a HostQueueEntry or a SpecialTask
539 """
540 if self.host_has_agent(entry.host):
541 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700542 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000543 'While scheduling %s, host %s already has a host agent %s'
544 % (entry, entry.host, agent.task))
545
546
547 def _get_agent_task_for_special_task(self, special_task):
548 """
549 Construct an AgentTask class to run the given SpecialTask and add it
550 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700551
MK Ryu35d661e2014-09-25 17:44:10 -0700552 A special task is created through schedule_special_tasks, but only if
beeps8bb1f7d2013-08-05 01:30:09 -0700553 the host doesn't already have an agent. This happens through
554 add_agent_task. All special agent tasks are given a host on creation,
555 and a Null hqe. To create a SpecialAgentTask object, you need a
556 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
557 object contains a hqe it's passed on to the special agent task, which
558 creates a HostQueueEntry and saves it as it's queue_entry.
559
showardd1195652009-12-08 22:21:02 +0000560 @param special_task: a models.SpecialTask instance
561 @returns an AgentTask to run this SpecialTask
562 """
563 self._assert_host_has_no_agent(special_task)
564
beeps5e2bb4a2013-10-28 11:26:45 -0700565 special_agent_task_classes = (prejob_task.CleanupTask,
566 prejob_task.VerifyTask,
567 prejob_task.RepairTask,
568 prejob_task.ResetTask,
569 prejob_task.ProvisionTask)
570
showardd1195652009-12-08 22:21:02 +0000571 for agent_task_class in special_agent_task_classes:
572 if agent_task_class.TASK_TYPE == special_task.task:
573 return agent_task_class(task=special_task)
574
Prashanth B0e960282014-05-13 19:38:28 -0700575 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800576 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000577
578
579 def _register_pidfiles(self, agent_tasks):
580 for agent_task in agent_tasks:
581 agent_task.register_necessary_pidfiles()
582
583
584 def _recover_tasks(self, agent_tasks):
585 orphans = _drone_manager.get_orphaned_autoserv_processes()
586
587 for agent_task in agent_tasks:
588 agent_task.recover()
589 if agent_task.monitor and agent_task.monitor.has_process():
590 orphans.discard(agent_task.monitor.get_process())
591 self.add_agent_task(agent_task)
592
593 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000594
595
showard8cc058f2009-09-08 16:26:33 +0000596 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000597 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
598 % status):
showard0db3d432009-10-12 20:29:15 +0000599 if entry.status == status and not self.get_agents_for_entry(entry):
600 # The status can change during iteration, e.g., if job.run()
601 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000602 yield entry
603
604
showard6878e8b2009-07-20 22:37:45 +0000605 def _check_for_remaining_orphan_processes(self, orphans):
606 if not orphans:
607 return
608 subject = 'Unrecovered orphan autoserv processes remain'
609 message = '\n'.join(str(process) for process in orphans)
610 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000611
612 die_on_orphans = global_config.global_config.get_config_value(
613 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
614
615 if die_on_orphans:
616 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000617
showard170873e2009-01-07 00:22:26 +0000618
showard8cc058f2009-09-08 16:26:33 +0000619 def _recover_pending_entries(self):
620 for entry in self._get_unassigned_entries(
621 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000622 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000623 entry.on_pending()
624
625
showardb8900452009-10-12 20:31:01 +0000626 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000627 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000628 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
629 unrecovered_hqes = []
630 for queue_entry in queue_entries:
631 special_tasks = models.SpecialTask.objects.filter(
632 task__in=(models.SpecialTask.Task.CLEANUP,
633 models.SpecialTask.Task.VERIFY),
634 queue_entry__id=queue_entry.id,
635 is_complete=False)
636 if special_tasks.count() == 0:
637 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000638
showardb8900452009-10-12 20:31:01 +0000639 if unrecovered_hqes:
640 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700641 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000642 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000643 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000644
645
showard65db3932009-10-28 19:54:35 +0000646 def _schedule_special_tasks(self):
647 """
648 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700649
650 Special tasks include PreJobTasks like verify, reset and cleanup.
651 They are created through _schedule_new_jobs and associated with a hqe
652 This method translates SpecialTasks to the appropriate AgentTask and
653 adds them to the dispatchers agents list, so _handle_agents can execute
654 them.
showard65db3932009-10-28 19:54:35 +0000655 """
Prashanth B4ec98672014-05-15 10:44:54 -0700656 # When the host scheduler is responsible for acquisition we only want
657 # to run tasks with leased hosts. All hqe tasks will already have
658 # leased hosts, and we don't want to run frontend tasks till the host
659 # scheduler has vetted the assignment. Note that this doesn't include
660 # frontend tasks with hosts leased by other active hqes.
661 for task in self._job_query_manager.get_prioritized_special_tasks(
662 only_tasks_with_leased_hosts=not _inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000663 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000664 continue
showardd1195652009-12-08 22:21:02 +0000665 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000666
667
showard170873e2009-01-07 00:22:26 +0000668 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000669 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000670 # should never happen
showarded2afea2009-07-07 20:54:07 +0000671 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000672 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000673 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700674 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000675 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000676
677
jadmanski0afbb632008-06-06 21:10:57 +0000678 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000679 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700680 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000681 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000682 if self.host_has_agent(host):
683 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000684 continue
showard8cc058f2009-09-08 16:26:33 +0000685 if self._host_has_scheduled_special_task(host):
686 # host will have a special task scheduled on the next cycle
687 continue
showard170873e2009-01-07 00:22:26 +0000688 if print_message:
showardb18134f2009-03-20 20:52:18 +0000689 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000690 models.SpecialTask.objects.create(
691 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000692 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000693
694
jadmanski0afbb632008-06-06 21:10:57 +0000695 def _recover_hosts(self):
696 # recover "Repair Failed" hosts
697 message = 'Reverifying dead host %s'
698 self._reverify_hosts_where("status = 'Repair Failed'",
699 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000700
701
showard89f84db2009-03-12 20:39:13 +0000702 def _refresh_pending_queue_entries(self):
703 """
704 Lookup the pending HostQueueEntries and call our HostScheduler
705 refresh() method given that list. Return the list.
706
707 @returns A list of pending HostQueueEntries sorted in priority order.
708 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700709 queue_entries = self._job_query_manager.get_pending_queue_entries(
710 only_hostless=not _inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000711 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000712 return []
showard89f84db2009-03-12 20:39:13 +0000713 return queue_entries
714
715
showarda9545c02009-12-18 22:44:26 +0000716 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800717 """Schedule a hostless (suite) job.
718
719 @param queue_entry: The queue_entry representing the hostless job.
720 """
showarda9545c02009-12-18 22:44:26 +0000721 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700722
723 # Need to set execution_subdir before setting the status:
724 # After a restart of the scheduler, agents will be restored for HQEs in
725 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
726 # execution_subdir is needed. Therefore it must be set before entering
727 # one of these states.
728 # Otherwise, if the scheduler was interrupted between setting the status
729 # and the execution_subdir, upon it's restart restoring agents would
730 # fail.
731 # Is there a way to get a status in one of these states without going
732 # through this code? Following cases are possible:
733 # - If it's aborted before being started:
734 # active bit will be 0, so there's nothing to parse, it will just be
735 # set to completed by _find_aborting. Critical statuses are skipped.
736 # - If it's aborted or it fails after being started:
737 # It was started, so this code was executed.
738 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000739 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000740
741
beepscc9fc702013-12-02 12:45:38 -0800742 def _schedule_host_job(self, host, queue_entry):
743 """Schedules a job on the given host.
744
745 1. Assign the host to the hqe, if it isn't already assigned.
746 2. Create a SpecialAgentTask for the hqe.
747 3. Activate the hqe.
748
749 @param queue_entry: The job to schedule.
750 @param host: The host to schedule the job on.
751 """
752 if self.host_has_agent(host):
753 host_agent_task = list(self._host_agents.get(host.id))[0].task
754 subject = 'Host with agents assigned to an HQE'
755 message = ('HQE: %s assigned host %s, but the host has '
756 'agent: %s for queue_entry %s. The HQE '
beepsf7d35162014-02-13 16:42:13 -0800757 'will have to try and acquire a host next tick ' %
beepscc9fc702013-12-02 12:45:38 -0800758 (queue_entry, host.hostname, host_agent_task,
759 host_agent_task.queue_entry))
760 email_manager.manager.enqueue_notify_email(subject, message)
beepscc9fc702013-12-02 12:45:38 -0800761 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700762 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800763
764
showard89f84db2009-03-12 20:39:13 +0000765 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700766 """
767 Find any new HQEs and call schedule_pre_job_tasks for it.
768
769 This involves setting the status of the HQE and creating a row in the
770 db corresponding the the special task, through
771 scheduler_models._queue_special_task. The new db row is then added as
772 an agent to the dispatcher through _schedule_special_tasks and
773 scheduled for execution on the drone through _handle_agents.
774 """
showard89f84db2009-03-12 20:39:13 +0000775 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000776
beepscc9fc702013-12-02 12:45:38 -0800777 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700778 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700779 new_jobs_with_hosts = 0
780 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800781 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700782 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000783
beepscc9fc702013-12-02 12:45:38 -0800784 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000785 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000786 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700787 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000788 else:
beepscc9fc702013-12-02 12:45:38 -0800789 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700790 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700791
beepsb255fc52013-10-13 23:28:54 -0700792 stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800793 if not host_jobs:
794 return
Prashanth Bf66d51b2014-05-06 12:42:25 -0700795 if not _inline_host_acquisition:
796 message = ('Found %s jobs that need hosts though '
797 '_inline_host_acquisition=%s. Will acquire hosts.' %
798 ([str(job) for job in host_jobs],
799 _inline_host_acquisition))
800 email_manager.manager.enqueue_notify_email(
801 'Processing unexpected host acquisition requests', message)
802 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
803 for host_assignment in jobs_with_hosts:
804 self._schedule_host_job(host_assignment.host, host_assignment.job)
805 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800806
beepsb255fc52013-10-13 23:28:54 -0700807 stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
808 stats.Gauge(key).send('new_jobs_without_hosts',
809 new_jobs_need_hosts - new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000810
811
showard8cc058f2009-09-08 16:26:33 +0000812 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700813 """
814 Adds agents to the dispatcher.
815
816 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
817 QueueTask for example, will have a job with a control file, and
818 the agent will have methods that poll, abort and check if the queue
819 task is finished. The dispatcher runs the agent_task, as well as
820 other agents in it's _agents member, through _handle_agents, by
821 calling the Agents tick().
822
823 This method creates an agent for each HQE in one of (starting, running,
824 gathering, parsing, archiving) states, and adds it to the dispatcher so
825 it is handled by _handle_agents.
826 """
showardd1195652009-12-08 22:21:02 +0000827 for agent_task in self._get_queue_entry_agent_tasks():
828 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000829
830
831 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000832 for entry in scheduler_models.HostQueueEntry.fetch(
833 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000834 task = entry.job.schedule_delayed_callback_task(entry)
835 if task:
showardd1195652009-12-08 22:21:02 +0000836 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000837
838
jadmanski0afbb632008-06-06 21:10:57 +0000839 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700840 """
841 Looks through the afe_host_queue_entries for an aborted entry.
842
843 The aborted bit is set on an HQE in many ways, the most common
844 being when a user requests an abort through the frontend, which
845 results in an rpc from the afe to abort_host_queue_entries.
846 """
jamesrene7c65cb2010-06-08 20:38:10 +0000847 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000848 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700849 where='aborted=1 and complete=0'):
showardf4a2e502009-07-28 20:06:39 +0000850 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800851
852 # The task would have started off with both is_complete and
853 # is_active = False. Aborted tasks are neither active nor complete.
854 # For all currently active tasks this will happen through the agent,
855 # but we need to manually update the special tasks that haven't
856 # started yet, because they don't have agents.
857 models.SpecialTask.objects.filter(is_active=False,
858 queue_entry_id=entry.id).update(is_complete=True)
859
showardd3dc1992009-04-22 21:01:40 +0000860 for agent in self.get_agents_for_entry(entry):
861 agent.abort()
862 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000863 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700864 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000865 for job in jobs_to_stop:
866 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000867
868
beeps8bb1f7d2013-08-05 01:30:09 -0700869 def _find_aborted_special_tasks(self):
870 """
871 Find SpecialTasks that have been marked for abortion.
872
873 Poll the database looking for SpecialTasks that are active
874 and have been marked for abortion, then abort them.
875 """
876
877 # The completed and active bits are very important when it comes
878 # to scheduler correctness. The active bit is set through the prolog
879 # of a special task, and reset through the cleanup method of the
880 # SpecialAgentTask. The cleanup is called both through the abort and
881 # epilog. The complete bit is set in several places, and in general
882 # a hanging job will have is_active=1 is_complete=0, while a special
883 # task which completed will have is_active=0 is_complete=1. To check
884 # aborts we directly check active because the complete bit is set in
885 # several places, including the epilog of agent tasks.
886 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
887 is_aborted=True)
888 for task in aborted_tasks:
889 # There are 2 ways to get the agent associated with a task,
890 # through the host and through the hqe. A special task
891 # always needs a host, but doesn't always need a hqe.
892 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700893 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000894
beeps8bb1f7d2013-08-05 01:30:09 -0700895 # The epilog preforms critical actions such as
896 # queueing the next SpecialTask, requeuing the
897 # hqe etc, however it doesn't actually kill the
898 # monitor process and set the 'done' bit. Epilogs
899 # assume that the job failed, and that the monitor
900 # process has already written an exit code. The
901 # done bit is a necessary condition for
902 # _handle_agents to schedule any more special
903 # tasks against the host, and it must be set
904 # in addition to is_active, is_complete and success.
905 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000906 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700907
908
showard324bf812009-01-20 23:23:38 +0000909 def _can_start_agent(self, agent, num_started_this_cycle,
910 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000911 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000912 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000913 return True
914 # don't allow any nonzero-process agents to run after we've reached a
915 # limit (this avoids starvation of many-process agents)
916 if have_reached_limit:
917 return False
918 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000919 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000920 agent.task.owner_username,
921 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000922 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000923 return False
924 # if a single agent exceeds the per-cycle throttling, still allow it to
925 # run when it's the first agent in the cycle
926 if num_started_this_cycle == 0:
927 return True
928 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000929 if (num_started_this_cycle + agent.task.num_processes >
930 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000931 return False
932 return True
933
934
jadmanski0afbb632008-06-06 21:10:57 +0000935 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700936 """
937 Handles agents of the dispatcher.
938
939 Appropriate Agents are added to the dispatcher through
940 _schedule_running_host_queue_entries. These agents each
941 have a task. This method runs the agents task through
942 agent.tick() leading to:
943 agent.start
944 prolog -> AgentTasks prolog
945 For each queue entry:
946 sets host status/status to Running
947 set started_on in afe_host_queue_entries
948 run -> AgentTasks run
949 Creates PidfileRunMonitor
950 Queues the autoserv command line for this AgentTask
951 via the drone manager. These commands are executed
952 through the drone managers execute actions.
953 poll -> AgentTasks/BaseAgentTask poll
954 checks the monitors exit_code.
955 Executes epilog if task is finished.
956 Executes AgentTasks _finish_task
957 finish_task is usually responsible for setting the status
958 of the HQE/host, and updating it's active and complete fileds.
959
960 agent.is_done
961 Removed the agent from the dispatchers _agents queue.
962 Is_done checks the finished bit on the agent, that is
963 set based on the Agents task. During the agents poll
964 we check to see if the monitor process has exited in
965 it's finish method, and set the success member of the
966 task based on this exit code.
967 """
jadmanski0afbb632008-06-06 21:10:57 +0000968 num_started_this_cycle = 0
Prashanth Balasubramanian628bfcf2014-10-02 12:44:13 -0700969 num_finished_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000970 have_reached_limit = False
971 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700972 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000973 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700974 self._log_extra_msg('Processing Agent with Host Ids: %s and '
975 'queue_entry ids:%s' % (agent.host_ids,
976 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000977 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000978 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000979 have_reached_limit):
980 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700981 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000982 continue
showardd1195652009-12-08 22:21:02 +0000983 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700984 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000985 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700986 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000987 if agent.is_done():
Prashanth Balasubramanian628bfcf2014-10-02 12:44:13 -0700988 num_finished_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700989 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000990 self.remove_agent(agent)
Prashanth Balasubramanian628bfcf2014-10-02 12:44:13 -0700991 stats.Gauge('scheduler.jobs_per_tick').send(
992 'agents_started', num_started_this_cycle)
993 stats.Gauge('scheduler.jobs_per_tick').send(
994 'agents_finished', num_finished_this_cycle)
Simran Basi3f6717d2012-09-13 15:21:22 -0700995 logging.info('%d running processes. %d added this cycle.',
996 _drone_manager.total_running_processes(),
997 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +0000998
999
showard29f7cd22009-04-29 21:16:24 +00001000 def _process_recurring_runs(self):
1001 recurring_runs = models.RecurringRun.objects.filter(
1002 start_date__lte=datetime.datetime.now())
1003 for rrun in recurring_runs:
1004 # Create job from template
1005 job = rrun.job
1006 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001007 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001008
1009 host_objects = info['hosts']
1010 one_time_hosts = info['one_time_hosts']
1011 metahost_objects = info['meta_hosts']
1012 dependencies = info['dependencies']
1013 atomic_group = info['atomic_group']
1014
1015 for host in one_time_hosts or []:
1016 this_host = models.Host.create_one_time_host(host.hostname)
1017 host_objects.append(this_host)
1018
1019 try:
1020 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001021 options=options,
showard29f7cd22009-04-29 21:16:24 +00001022 host_objects=host_objects,
1023 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001024 atomic_group=atomic_group)
1025
1026 except Exception, ex:
1027 logging.exception(ex)
1028 #TODO send email
1029
1030 if rrun.loop_count == 1:
1031 rrun.delete()
1032 else:
1033 if rrun.loop_count != 0: # if not infinite loop
1034 # calculate new start_date
1035 difference = datetime.timedelta(seconds=rrun.loop_period)
1036 rrun.start_date = rrun.start_date + difference
1037 rrun.loop_count -= 1
1038 rrun.save()
1039
1040
Simran Basia858a232012-08-21 11:04:37 -07001041SiteDispatcher = utils.import_site_class(
1042 __file__, 'autotest_lib.scheduler.site_monitor_db',
1043 'SiteDispatcher', BaseDispatcher)
1044
1045class Dispatcher(SiteDispatcher):
1046 pass
1047
1048
mbligh36768f02008-02-22 18:28:33 +00001049class Agent(object):
showard77182562009-06-10 00:16:05 +00001050 """
Alex Miller47715eb2013-07-24 03:34:01 -07001051 An agent for use by the Dispatcher class to perform a task. An agent wraps
1052 around an AgentTask mainly to associate the AgentTask with the queue_entry
1053 and host ids.
showard77182562009-06-10 00:16:05 +00001054
1055 The following methods are required on all task objects:
1056 poll() - Called periodically to let the task check its status and
1057 update its internal state. If the task succeeded.
1058 is_done() - Returns True if the task is finished.
1059 abort() - Called when an abort has been requested. The task must
1060 set its aborted attribute to True if it actually aborted.
1061
1062 The following attributes are required on all task objects:
1063 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001064 success - bool, True if this task succeeded.
1065 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1066 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001067 """
1068
1069
showard418785b2009-11-23 20:19:59 +00001070 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001071 """
Alex Miller47715eb2013-07-24 03:34:01 -07001072 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001073 """
showard8cc058f2009-09-08 16:26:33 +00001074 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001075
showard77182562009-06-10 00:16:05 +00001076 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001077 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001078
showard8cc058f2009-09-08 16:26:33 +00001079 self.queue_entry_ids = task.queue_entry_ids
1080 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001081
showard8cc058f2009-09-08 16:26:33 +00001082 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001083 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001084
1085
jadmanski0afbb632008-06-06 21:10:57 +00001086 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001087 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001088 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001089 self.task.poll()
1090 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001091 self.finished = True
showardec113162008-05-08 00:52:49 +00001092
1093
jadmanski0afbb632008-06-06 21:10:57 +00001094 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001095 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001096
1097
showardd3dc1992009-04-22 21:01:40 +00001098 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001099 if self.task:
1100 self.task.abort()
1101 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001102 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001103 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001104
showardd3dc1992009-04-22 21:01:40 +00001105
beeps5e2bb4a2013-10-28 11:26:45 -07001106class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001107 """
1108 Common functionality for QueueTask and HostlessQueueTask
1109 """
1110 def __init__(self, queue_entries):
1111 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001112 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001113 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001114
1115
showard73ec0442009-02-07 02:05:20 +00001116 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001117 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001118
1119
jamesrenc44ae992010-02-19 00:12:54 +00001120 def _write_control_file(self, execution_path):
1121 control_path = _drone_manager.attach_file_to_execution(
1122 execution_path, self.job.control_file)
1123 return control_path
1124
1125
Aviv Keshet308e7362013-05-21 14:43:16 -07001126 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001127 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001128 execution_path = self.queue_entries[0].execution_path()
1129 control_path = self._write_control_file(execution_path)
1130 hostnames = ','.join(entry.host.hostname
1131 for entry in self.queue_entries
1132 if not entry.is_hostless())
1133
1134 execution_tag = self.queue_entries[0].execution_tag()
1135 params = _autoserv_command_line(
1136 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001137 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001138 _drone_manager.absolute_path(control_path)],
1139 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001140 if self.job.is_image_update_job():
1141 params += ['--image', self.job.update_image_path]
1142
jamesrenc44ae992010-02-19 00:12:54 +00001143 return params
showardd1195652009-12-08 22:21:02 +00001144
1145
1146 @property
1147 def num_processes(self):
1148 return len(self.queue_entries)
1149
1150
1151 @property
1152 def owner_username(self):
1153 return self.job.owner
1154
1155
1156 def _working_directory(self):
1157 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001158
1159
jadmanski0afbb632008-06-06 21:10:57 +00001160 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001161 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001162 keyval_dict = self.job.keyval_dict()
1163 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001164 group_name = self.queue_entries[0].get_group_name()
1165 if group_name:
1166 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001167 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001168 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001169 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001170 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001171
1172
showard35162b02009-03-03 02:17:30 +00001173 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001174 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001175 _drone_manager.write_lines_to_file(error_file_path,
1176 [_LOST_PROCESS_ERROR])
1177
1178
showardd3dc1992009-04-22 21:01:40 +00001179 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001180 if not self.monitor:
1181 return
1182
showardd9205182009-04-27 20:09:55 +00001183 self._write_job_finished()
1184
showard35162b02009-03-03 02:17:30 +00001185 if self.monitor.lost_process:
1186 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001187
jadmanskif7fa2cc2008-10-01 14:13:23 +00001188
showardcbd74612008-11-19 21:42:02 +00001189 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001190 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001191 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001192 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001193 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001194
1195
jadmanskif7fa2cc2008-10-01 14:13:23 +00001196 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001197 if not self.monitor or not self.monitor.has_process():
1198 return
1199
jadmanskif7fa2cc2008-10-01 14:13:23 +00001200 # build up sets of all the aborted_by and aborted_on values
1201 aborted_by, aborted_on = set(), set()
1202 for queue_entry in self.queue_entries:
1203 if queue_entry.aborted_by:
1204 aborted_by.add(queue_entry.aborted_by)
1205 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1206 aborted_on.add(t)
1207
1208 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001209 # TODO(showard): this conditional is now obsolete, we just need to leave
1210 # it in temporarily for backwards compatibility over upgrades. delete
1211 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001212 assert len(aborted_by) <= 1
1213 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001214 aborted_by_value = aborted_by.pop()
1215 aborted_on_value = max(aborted_on)
1216 else:
1217 aborted_by_value = 'autotest_system'
1218 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001219
showarda0382352009-02-11 23:36:43 +00001220 self._write_keyval_after_job("aborted_by", aborted_by_value)
1221 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001222
showardcbd74612008-11-19 21:42:02 +00001223 aborted_on_string = str(datetime.datetime.fromtimestamp(
1224 aborted_on_value))
1225 self._write_status_comment('Job aborted by %s on %s' %
1226 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001227
1228
jadmanski0afbb632008-06-06 21:10:57 +00001229 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001230 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001231 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001232 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001233
1234
jadmanski0afbb632008-06-06 21:10:57 +00001235 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001236 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001237 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001238
1239
1240class QueueTask(AbstractQueueTask):
1241 def __init__(self, queue_entries):
1242 super(QueueTask, self).__init__(queue_entries)
1243 self._set_ids(queue_entries=queue_entries)
1244
1245
1246 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001247 self._check_queue_entry_statuses(
1248 self.queue_entries,
1249 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1250 models.HostQueueEntry.Status.RUNNING),
1251 allowed_host_statuses=(models.Host.Status.PENDING,
1252 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001253
1254 super(QueueTask, self).prolog()
1255
1256 for queue_entry in self.queue_entries:
1257 self._write_host_keyvals(queue_entry.host)
1258 queue_entry.host.set_status(models.Host.Status.RUNNING)
1259 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001260
1261
1262 def _finish_task(self):
1263 super(QueueTask, self)._finish_task()
1264
1265 for queue_entry in self.queue_entries:
1266 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001267 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001268
1269
Alex Miller9f01d5d2013-08-08 02:26:01 -07001270 def _command_line(self):
1271 invocation = super(QueueTask, self)._command_line()
1272 return invocation + ['--verify_job_repo_url']
1273
1274
Dan Shi1a189052013-10-28 14:41:35 -07001275class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001276 def __init__(self, queue_entry):
1277 super(HostlessQueueTask, self).__init__([queue_entry])
1278 self.queue_entry_ids = [queue_entry.id]
1279
1280
1281 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001282 super(HostlessQueueTask, self).prolog()
1283
1284
mbligh4608b002010-01-05 18:22:35 +00001285 def _finish_task(self):
1286 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001287
1288 # When a job is added to database, its initial status is always
1289 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1290 # status, check if any of them can be started. If scheduler hits some
Alex Millerac189f32014-06-23 13:55:23 -07001291 # limit, e.g., max_hostless_jobs_per_drone,
1292 # max_processes_started_per_cycle, scheduler will leave these jobs in
1293 # Starting status. Otherwise, the jobs' status will be changed to
1294 # Running, and an autoserv process will be started in drone for each of
1295 # these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001296 # If the entry is still in status Starting, the process has not started
1297 # yet. Therefore, there is no need to parse and collect log. Without
1298 # this check, exception will be raised by scheduler as execution_subdir
1299 # for this queue entry does not have a value yet.
1300 hqe = self.queue_entries[0]
1301 if hqe.status != models.HostQueueEntry.Status.STARTING:
1302 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001303
1304
mbligh36768f02008-02-22 18:28:33 +00001305if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001306 main()