blob: 4d42b824c8eea6b8e87b0f7cc941656ef53397b1 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
Dan Shif6c65bd2014-08-29 16:15:07 -07008import datetime
9import gc
10import logging
11import optparse
12import os
13import signal
14import sys
15import time
showard402934a2009-12-21 22:20:47 +000016
Alex Miller05d7b4c2013-03-04 07:49:38 -080017import common
showard21baa452008-10-21 00:08:39 +000018from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000019
20import django.db
21
Prashanth B0e960282014-05-13 19:38:28 -070022from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070023from autotest_lib.client.common_lib import utils
Michael Liangda8c60a2014-06-03 13:24:51 -070024from autotest_lib.client.common_lib.cros.graphite import stats
Prashanth B0e960282014-05-13 19:38:28 -070025from autotest_lib.frontend.afe import models, rpc_utils
beeps5e2bb4a2013-10-28 11:26:45 -070026from autotest_lib.scheduler import agent_task, drone_manager, drones
27from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
28from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070029from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070030from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070031from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000032from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080033from autotest_lib.scheduler import status_server, scheduler_config
Prashanth Bf66d51b2014-05-06 12:42:25 -070034from autotest_lib.scheduler import scheduler_lib
Aviv Keshet308e7362013-05-21 14:43:16 -070035from autotest_lib.server import autoserv_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080036
showard549afad2009-08-20 23:33:36 +000037BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
38PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000039
mbligh36768f02008-02-22 18:28:33 +000040RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000041AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
42
43if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000044 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000045AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
46AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
47
48if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000049 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000050
showard35162b02009-03-03 02:17:30 +000051# error message to leave in results dir when an autoserv process disappears
52# mysteriously
53_LOST_PROCESS_ERROR = """\
54Autoserv failed abnormally during execution for this job, probably due to a
55system error on the Autotest server. Full results may not be available. Sorry.
56"""
57
Prashanth B0e960282014-05-13 19:38:28 -070058_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070059_db = None
mbligh36768f02008-02-22 18:28:33 +000060_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070061
62# These 2 globals are replaced for testing
63_autoserv_directory = autoserv_utils.autoserv_directory
64_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000065_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000066_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070067_inline_host_acquisition = global_config.global_config.get_config_value(
68 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
69 default=True)
70
mbligh36768f02008-02-22 18:28:33 +000071
Eric Lie0493a42010-11-15 13:05:43 -080072def _parser_path_default(install_dir):
73 return os.path.join(install_dir, 'tko', 'parse')
74_parser_path_func = utils.import_site_function(
75 __file__, 'autotest_lib.scheduler.site_monitor_db',
76 'parser_path', _parser_path_default)
77_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
78
mbligh36768f02008-02-22 18:28:33 +000079
mbligh83c1e9e2009-05-01 23:10:41 +000080def _site_init_monitor_db_dummy():
81 return {}
82
83
jamesren76fcf192010-04-21 20:39:50 +000084def _verify_default_drone_set_exists():
85 if (models.DroneSet.drone_sets_enabled() and
86 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070087 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080088 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000089
90
91def _sanity_check():
92 """Make sure the configs are consistent before starting the scheduler"""
93 _verify_default_drone_set_exists()
94
95
mbligh36768f02008-02-22 18:28:33 +000096def main():
showard27f33872009-04-07 18:20:53 +000097 try:
showard549afad2009-08-20 23:33:36 +000098 try:
99 main_without_exception_handling()
100 except SystemExit:
101 raise
102 except:
103 logging.exception('Exception escaping in monitor_db')
104 raise
105 finally:
106 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000107
108
109def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700110 scheduler_lib.setup_logging(
111 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
112 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000113 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000114 parser = optparse.OptionParser(usage)
115 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
116 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000117 parser.add_option('--test', help='Indicate that scheduler is under ' +
118 'test and should use dummy autoserv and no parsing',
119 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700120 # TODO(dshi): change default to False after puppet change is landed in
121 # production.
122 parser.add_option('--production',
123 help=('Indicate that scheduler is running in production '
124 'environment and it can use database that is not '
125 'hosted in localhost. If it is set to False, '
126 'scheduler will fail if database is not in '
127 'localhost.'),
128 action='store_true', default=True)
jadmanski0afbb632008-06-06 21:10:57 +0000129 (options, args) = parser.parse_args()
130 if len(args) != 1:
131 parser.print_usage()
132 return
mbligh36768f02008-02-22 18:28:33 +0000133
Dan Shif6c65bd2014-08-29 16:15:07 -0700134 scheduler_lib.check_production_settings(options)
135
showard5613c662009-06-08 23:30:33 +0000136 scheduler_enabled = global_config.global_config.get_config_value(
137 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
138
139 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800140 logging.error("Scheduler not enabled, set enable_scheduler to true in "
141 "the global_config's SCHEDULER section to enable it. "
142 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000143 sys.exit(1)
144
jadmanski0afbb632008-06-06 21:10:57 +0000145 global RESULTS_DIR
146 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000147
mbligh83c1e9e2009-05-01 23:10:41 +0000148 site_init = utils.import_site_function(__file__,
149 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
150 _site_init_monitor_db_dummy)
151 site_init()
152
showardcca334f2009-03-12 20:38:34 +0000153 # Change the cwd while running to avoid issues incase we were launched from
154 # somewhere odd (such as a random NFS home directory of the person running
155 # sudo to launch us as the appropriate user).
156 os.chdir(RESULTS_DIR)
157
jamesrenc7d387e2010-08-10 21:48:30 +0000158 # This is helpful for debugging why stuff a scheduler launches is
159 # misbehaving.
160 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000161
jadmanski0afbb632008-06-06 21:10:57 +0000162 if options.test:
163 global _autoserv_path
164 _autoserv_path = 'autoserv_dummy'
165 global _testing_mode
166 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000167
jamesrenc44ae992010-02-19 00:12:54 +0000168 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000169 server.start()
170
jadmanski0afbb632008-06-06 21:10:57 +0000171 try:
jamesrenc44ae992010-02-19 00:12:54 +0000172 initialize()
showardc5afc462009-01-13 00:09:39 +0000173 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000174 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000175
Eric Lia82dc352011-02-23 13:15:52 -0800176 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000177 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000178 time.sleep(scheduler_config.config.tick_pause_sec)
Prashanth B4ec98672014-05-15 10:44:54 -0700179 except Exception:
showard170873e2009-01-07 00:22:26 +0000180 email_manager.manager.log_stacktrace(
181 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000182
showard170873e2009-01-07 00:22:26 +0000183 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000184 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000185 _drone_manager.shutdown()
Prashanth B0e960282014-05-13 19:38:28 -0700186 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000187
188
Prashanth B4ec98672014-05-15 10:44:54 -0700189def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000190 global _shutdown
191 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000192 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000193
194
jamesrenc44ae992010-02-19 00:12:54 +0000195def initialize():
showardb18134f2009-03-20 20:52:18 +0000196 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
197 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000198
showard8de37132009-08-31 18:33:08 +0000199 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000200 logging.critical("monitor_db already running, aborting!")
201 sys.exit(1)
202 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000203
showardb1e51872008-10-07 11:08:18 +0000204 if _testing_mode:
205 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700206 scheduler_lib.DB_CONFIG_SECTION, 'database',
207 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000208
jadmanski0afbb632008-06-06 21:10:57 +0000209 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700210 global _db_manager
211 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700212 global _db
213 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000214 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700215 signal.signal(signal.SIGINT, handle_signal)
216 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000217
jamesrenc44ae992010-02-19 00:12:54 +0000218 initialize_globals()
219 scheduler_models.initialize()
220
showardd1ee1dd2009-01-07 21:33:08 +0000221 drones = global_config.global_config.get_config_value(
222 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
223 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000224 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000225 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000226 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
227
showardb18134f2009-03-20 20:52:18 +0000228 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000229
230
jamesrenc44ae992010-02-19 00:12:54 +0000231def initialize_globals():
232 global _drone_manager
233 _drone_manager = drone_manager.instance()
234
235
showarded2afea2009-07-07 20:54:07 +0000236def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
237 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000238 """
239 @returns The autoserv command line as a list of executable + parameters.
240
241 @param machines - string - A machine or comma separated list of machines
242 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000243 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700244 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
245 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000246 @param queue_entry - A HostQueueEntry object - If supplied and no Job
247 object was supplied, this will be used to lookup the Job object.
248 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700249 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
250 machines, results_directory=drone_manager.WORKING_DIRECTORY,
251 extra_args=extra_args, job=job, queue_entry=queue_entry,
252 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000253
254
Simran Basia858a232012-08-21 11:04:37 -0700255class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800256
257
jadmanski0afbb632008-06-06 21:10:57 +0000258 def __init__(self):
259 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000260 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700261 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000262 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700263 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700264 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Jakob Jülich36accc62014-07-23 10:26:55 -0700265 _db)
showard170873e2009-01-07 00:22:26 +0000266 self._host_agents = {}
267 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000268 self._tick_count = 0
269 self._last_garbage_stats_time = time.time()
270 self._seconds_between_garbage_stats = 60 * (
271 global_config.global_config.get_config_value(
272 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700273 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700274 self._tick_debug = global_config.global_config.get_config_value(
275 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
276 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700277 self._extra_debugging = global_config.global_config.get_config_value(
278 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
279 default=False)
mbligh36768f02008-02-22 18:28:33 +0000280
Prashanth Bf66d51b2014-05-06 12:42:25 -0700281 # If _inline_host_acquisition is set the scheduler will acquire and
282 # release hosts against jobs inline, with the tick. Otherwise the
283 # scheduler will only focus on jobs that already have hosts, and
284 # will not explicitly unlease a host when a job finishes using it.
285 self._job_query_manager = query_managers.AFEJobQueryManager()
286 self._host_scheduler = (host_scheduler.BaseHostScheduler()
287 if _inline_host_acquisition else
288 host_scheduler.DummyHostScheduler())
289
mbligh36768f02008-02-22 18:28:33 +0000290
showard915958d2009-04-22 21:00:58 +0000291 def initialize(self, recover_hosts=True):
292 self._periodic_cleanup.initialize()
293 self._24hr_upkeep.initialize()
294
jadmanski0afbb632008-06-06 21:10:57 +0000295 # always recover processes
296 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000297
jadmanski0afbb632008-06-06 21:10:57 +0000298 if recover_hosts:
299 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000300
301
Simran Basi0ec94dd2012-08-28 09:50:10 -0700302 def _log_tick_msg(self, msg):
303 if self._tick_debug:
304 logging.debug(msg)
305
306
Simran Basidef92872012-09-20 13:34:34 -0700307 def _log_extra_msg(self, msg):
308 if self._extra_debugging:
309 logging.debug(msg)
310
311
jadmanski0afbb632008-06-06 21:10:57 +0000312 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700313 """
314 This is an altered version of tick() where we keep track of when each
315 major step begins so we can try to figure out where we are using most
316 of the tick time.
317 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700318 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700319 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000320 self._garbage_collection()
Prashanth B340fd1e2014-06-22 12:44:10 -0700321 self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
322 _drone_manager.trigger_refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700323 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000324 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700325 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000326 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700327 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000328 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700329 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000330 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700331 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000332 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700333 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000334 self._schedule_new_jobs()
Prashanth B340fd1e2014-06-22 12:44:10 -0700335 self._log_tick_msg('Calling _drone_manager.sync_refresh().')
336 _drone_manager.sync_refresh()
Prashanth B67548092014-07-11 18:46:01 -0700337 self._log_tick_msg('Calling _find_aborting().')
338 self._find_aborting()
339 self._log_tick_msg('Calling _find_aborted_special_tasks().')
340 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700341 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000342 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700343 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000344 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700345 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000346 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700347 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700348 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700349 with timer.get_client('email_manager_send_queued_emails'):
350 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700351 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700352 with timer.get_client('django_db_reset_queries'):
353 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000354 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000355
showard97aed502008-11-04 02:01:24 +0000356
mblighf3294cc2009-04-08 21:17:38 +0000357 def _run_cleanup(self):
358 self._periodic_cleanup.run_cleanup_maybe()
359 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000360
mbligh36768f02008-02-22 18:28:33 +0000361
showardf13a9e22009-12-18 22:54:09 +0000362 def _garbage_collection(self):
363 threshold_time = time.time() - self._seconds_between_garbage_stats
364 if threshold_time < self._last_garbage_stats_time:
365 # Don't generate these reports very often.
366 return
367
368 self._last_garbage_stats_time = time.time()
369 # Force a full level 0 collection (because we can, it doesn't hurt
370 # at this interval).
371 gc.collect()
372 logging.info('Logging garbage collector stats on tick %d.',
373 self._tick_count)
374 gc_stats._log_garbage_collector_stats()
375
376
showard170873e2009-01-07 00:22:26 +0000377 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
378 for object_id in object_ids:
379 agent_dict.setdefault(object_id, set()).add(agent)
380
381
382 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
383 for object_id in object_ids:
384 assert object_id in agent_dict
385 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700386 # If an ID has no more active agent associated, there is no need to
387 # keep it in the dictionary. Otherwise, scheduler will keep an
388 # unnecessarily big dictionary until being restarted.
389 if not agent_dict[object_id]:
390 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000391
392
showardd1195652009-12-08 22:21:02 +0000393 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700394 """
395 Creates and adds an agent to the dispatchers list.
396
397 In creating the agent we also pass on all the queue_entry_ids and
398 host_ids from the special agent task. For every agent we create, we
399 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
400 against the host_ids given to it. So theoritically, a host can have any
401 number of agents associated with it, and each of them can have any
402 special agent task, though in practice we never see > 1 agent/task per
403 host at any time.
404
405 @param agent_task: A SpecialTask for the agent to manage.
406 """
showardd1195652009-12-08 22:21:02 +0000407 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000408 self._agents.append(agent)
409 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000410 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
411 self._register_agent_for_ids(self._queue_entry_agents,
412 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000413
showard170873e2009-01-07 00:22:26 +0000414
415 def get_agents_for_entry(self, queue_entry):
416 """
417 Find agents corresponding to the specified queue_entry.
418 """
showardd3dc1992009-04-22 21:01:40 +0000419 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000420
421
422 def host_has_agent(self, host):
423 """
424 Determine if there is currently an Agent present using this host.
425 """
426 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000427
428
jadmanski0afbb632008-06-06 21:10:57 +0000429 def remove_agent(self, agent):
430 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000431 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
432 agent)
433 self._unregister_agent_for_ids(self._queue_entry_agents,
434 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000435
436
showard8cc058f2009-09-08 16:26:33 +0000437 def _host_has_scheduled_special_task(self, host):
438 return bool(models.SpecialTask.objects.filter(host__id=host.id,
439 is_active=False,
440 is_complete=False))
441
442
jadmanski0afbb632008-06-06 21:10:57 +0000443 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000444 agent_tasks = self._create_recovery_agent_tasks()
445 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000446 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000447 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000448 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000449 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000450 self._reverify_remaining_hosts()
451 # reinitialize drones after killing orphaned processes, since they can
452 # leave around files when they die
453 _drone_manager.execute_actions()
454 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000455
showard170873e2009-01-07 00:22:26 +0000456
showardd1195652009-12-08 22:21:02 +0000457 def _create_recovery_agent_tasks(self):
458 return (self._get_queue_entry_agent_tasks()
459 + self._get_special_task_agent_tasks(is_active=True))
460
461
462 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700463 """
464 Get agent tasks for all hqe in the specified states.
465
466 Loosely this translates to taking a hqe in one of the specified states,
467 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
468 through _get_agent_task_for_queue_entry. Each queue entry can only have
469 one agent task at a time, but there might be multiple queue entries in
470 the group.
471
472 @return: A list of AgentTasks.
473 """
showardd1195652009-12-08 22:21:02 +0000474 # host queue entry statuses handled directly by AgentTasks (Verifying is
475 # handled through SpecialTasks, so is not listed here)
476 statuses = (models.HostQueueEntry.Status.STARTING,
477 models.HostQueueEntry.Status.RUNNING,
478 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000479 models.HostQueueEntry.Status.PARSING,
480 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000481 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000482 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000483 where='status IN (%s)' % status_list)
Alex Miller47cd2472013-11-25 15:20:04 -0800484 stats.Gauge('scheduler.jobs_per_tick').send(
485 'running', len(queue_entries))
showardd1195652009-12-08 22:21:02 +0000486
487 agent_tasks = []
488 used_queue_entries = set()
489 for entry in queue_entries:
490 if self.get_agents_for_entry(entry):
491 # already being handled
492 continue
493 if entry in used_queue_entries:
494 # already picked up by a synchronous job
495 continue
496 agent_task = self._get_agent_task_for_queue_entry(entry)
497 agent_tasks.append(agent_task)
498 used_queue_entries.update(agent_task.queue_entries)
499 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000500
501
showardd1195652009-12-08 22:21:02 +0000502 def _get_special_task_agent_tasks(self, is_active=False):
503 special_tasks = models.SpecialTask.objects.filter(
504 is_active=is_active, is_complete=False)
505 return [self._get_agent_task_for_special_task(task)
506 for task in special_tasks]
507
508
509 def _get_agent_task_for_queue_entry(self, queue_entry):
510 """
beeps8bb1f7d2013-08-05 01:30:09 -0700511 Construct an AgentTask instance for the given active HostQueueEntry.
512
showardd1195652009-12-08 22:21:02 +0000513 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700514 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000515 """
516 task_entries = queue_entry.job.get_group_entries(queue_entry)
517 self._check_for_duplicate_host_entries(task_entries)
518
519 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
520 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000521 if queue_entry.is_hostless():
522 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000523 return QueueTask(queue_entries=task_entries)
524 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700525 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000526 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700527 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000528 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700529 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000530
Prashanth B0e960282014-05-13 19:38:28 -0700531 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800532 '_get_agent_task_for_queue_entry got entry with '
533 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000534
535
536 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000537 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
538 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000539 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000540 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000541 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000542 if using_host:
showardd1195652009-12-08 22:21:02 +0000543 self._assert_host_has_no_agent(task_entry)
544
545
546 def _assert_host_has_no_agent(self, entry):
547 """
548 @param entry: a HostQueueEntry or a SpecialTask
549 """
550 if self.host_has_agent(entry.host):
551 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700552 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000553 'While scheduling %s, host %s already has a host agent %s'
554 % (entry, entry.host, agent.task))
555
556
557 def _get_agent_task_for_special_task(self, special_task):
558 """
559 Construct an AgentTask class to run the given SpecialTask and add it
560 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700561
562 A special task is create through schedule_special_tasks, but only if
563 the host doesn't already have an agent. This happens through
564 add_agent_task. All special agent tasks are given a host on creation,
565 and a Null hqe. To create a SpecialAgentTask object, you need a
566 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
567 object contains a hqe it's passed on to the special agent task, which
568 creates a HostQueueEntry and saves it as it's queue_entry.
569
showardd1195652009-12-08 22:21:02 +0000570 @param special_task: a models.SpecialTask instance
571 @returns an AgentTask to run this SpecialTask
572 """
573 self._assert_host_has_no_agent(special_task)
574
beeps5e2bb4a2013-10-28 11:26:45 -0700575 special_agent_task_classes = (prejob_task.CleanupTask,
576 prejob_task.VerifyTask,
577 prejob_task.RepairTask,
578 prejob_task.ResetTask,
579 prejob_task.ProvisionTask)
580
showardd1195652009-12-08 22:21:02 +0000581 for agent_task_class in special_agent_task_classes:
582 if agent_task_class.TASK_TYPE == special_task.task:
583 return agent_task_class(task=special_task)
584
Prashanth B0e960282014-05-13 19:38:28 -0700585 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800586 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000587
588
589 def _register_pidfiles(self, agent_tasks):
590 for agent_task in agent_tasks:
591 agent_task.register_necessary_pidfiles()
592
593
594 def _recover_tasks(self, agent_tasks):
595 orphans = _drone_manager.get_orphaned_autoserv_processes()
596
597 for agent_task in agent_tasks:
598 agent_task.recover()
599 if agent_task.monitor and agent_task.monitor.has_process():
600 orphans.discard(agent_task.monitor.get_process())
601 self.add_agent_task(agent_task)
602
603 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000604
605
showard8cc058f2009-09-08 16:26:33 +0000606 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000607 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
608 % status):
showard0db3d432009-10-12 20:29:15 +0000609 if entry.status == status and not self.get_agents_for_entry(entry):
610 # The status can change during iteration, e.g., if job.run()
611 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000612 yield entry
613
614
showard6878e8b2009-07-20 22:37:45 +0000615 def _check_for_remaining_orphan_processes(self, orphans):
616 if not orphans:
617 return
618 subject = 'Unrecovered orphan autoserv processes remain'
619 message = '\n'.join(str(process) for process in orphans)
620 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000621
622 die_on_orphans = global_config.global_config.get_config_value(
623 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
624
625 if die_on_orphans:
626 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000627
showard170873e2009-01-07 00:22:26 +0000628
showard8cc058f2009-09-08 16:26:33 +0000629 def _recover_pending_entries(self):
630 for entry in self._get_unassigned_entries(
631 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000632 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000633 entry.on_pending()
634
635
showardb8900452009-10-12 20:31:01 +0000636 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000637 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000638 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
639 unrecovered_hqes = []
640 for queue_entry in queue_entries:
641 special_tasks = models.SpecialTask.objects.filter(
642 task__in=(models.SpecialTask.Task.CLEANUP,
643 models.SpecialTask.Task.VERIFY),
644 queue_entry__id=queue_entry.id,
645 is_complete=False)
646 if special_tasks.count() == 0:
647 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000648
showardb8900452009-10-12 20:31:01 +0000649 if unrecovered_hqes:
650 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700651 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000652 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000653 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000654
655
showard65db3932009-10-28 19:54:35 +0000656 def _schedule_special_tasks(self):
657 """
658 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700659
660 Special tasks include PreJobTasks like verify, reset and cleanup.
661 They are created through _schedule_new_jobs and associated with a hqe
662 This method translates SpecialTasks to the appropriate AgentTask and
663 adds them to the dispatchers agents list, so _handle_agents can execute
664 them.
showard65db3932009-10-28 19:54:35 +0000665 """
Prashanth B4ec98672014-05-15 10:44:54 -0700666 # When the host scheduler is responsible for acquisition we only want
667 # to run tasks with leased hosts. All hqe tasks will already have
668 # leased hosts, and we don't want to run frontend tasks till the host
669 # scheduler has vetted the assignment. Note that this doesn't include
670 # frontend tasks with hosts leased by other active hqes.
671 for task in self._job_query_manager.get_prioritized_special_tasks(
672 only_tasks_with_leased_hosts=not _inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000673 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000674 continue
showardd1195652009-12-08 22:21:02 +0000675 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000676
677
showard170873e2009-01-07 00:22:26 +0000678 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000679 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000680 # should never happen
showarded2afea2009-07-07 20:54:07 +0000681 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000682 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000683 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700684 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000685 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000686
687
jadmanski0afbb632008-06-06 21:10:57 +0000688 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000689 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700690 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000691 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000692 if self.host_has_agent(host):
693 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000694 continue
showard8cc058f2009-09-08 16:26:33 +0000695 if self._host_has_scheduled_special_task(host):
696 # host will have a special task scheduled on the next cycle
697 continue
showard170873e2009-01-07 00:22:26 +0000698 if print_message:
showardb18134f2009-03-20 20:52:18 +0000699 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000700 models.SpecialTask.objects.create(
701 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000702 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000703
704
jadmanski0afbb632008-06-06 21:10:57 +0000705 def _recover_hosts(self):
706 # recover "Repair Failed" hosts
707 message = 'Reverifying dead host %s'
708 self._reverify_hosts_where("status = 'Repair Failed'",
709 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000710
711
showard89f84db2009-03-12 20:39:13 +0000712 def _refresh_pending_queue_entries(self):
713 """
714 Lookup the pending HostQueueEntries and call our HostScheduler
715 refresh() method given that list. Return the list.
716
717 @returns A list of pending HostQueueEntries sorted in priority order.
718 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700719 queue_entries = self._job_query_manager.get_pending_queue_entries(
720 only_hostless=not _inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000721 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000722 return []
showard89f84db2009-03-12 20:39:13 +0000723 return queue_entries
724
725
showarda9545c02009-12-18 22:44:26 +0000726 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800727 """Schedule a hostless (suite) job.
728
729 @param queue_entry: The queue_entry representing the hostless job.
730 """
showarda9545c02009-12-18 22:44:26 +0000731 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700732
733 # Need to set execution_subdir before setting the status:
734 # After a restart of the scheduler, agents will be restored for HQEs in
735 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
736 # execution_subdir is needed. Therefore it must be set before entering
737 # one of these states.
738 # Otherwise, if the scheduler was interrupted between setting the status
739 # and the execution_subdir, upon it's restart restoring agents would
740 # fail.
741 # Is there a way to get a status in one of these states without going
742 # through this code? Following cases are possible:
743 # - If it's aborted before being started:
744 # active bit will be 0, so there's nothing to parse, it will just be
745 # set to completed by _find_aborting. Critical statuses are skipped.
746 # - If it's aborted or it fails after being started:
747 # It was started, so this code was executed.
748 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000749 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000750
751
beepscc9fc702013-12-02 12:45:38 -0800752 def _schedule_host_job(self, host, queue_entry):
753 """Schedules a job on the given host.
754
755 1. Assign the host to the hqe, if it isn't already assigned.
756 2. Create a SpecialAgentTask for the hqe.
757 3. Activate the hqe.
758
759 @param queue_entry: The job to schedule.
760 @param host: The host to schedule the job on.
761 """
762 if self.host_has_agent(host):
763 host_agent_task = list(self._host_agents.get(host.id))[0].task
764 subject = 'Host with agents assigned to an HQE'
765 message = ('HQE: %s assigned host %s, but the host has '
766 'agent: %s for queue_entry %s. The HQE '
beepsf7d35162014-02-13 16:42:13 -0800767 'will have to try and acquire a host next tick ' %
beepscc9fc702013-12-02 12:45:38 -0800768 (queue_entry, host.hostname, host_agent_task,
769 host_agent_task.queue_entry))
770 email_manager.manager.enqueue_notify_email(subject, message)
beepscc9fc702013-12-02 12:45:38 -0800771 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700772 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800773
774
showard89f84db2009-03-12 20:39:13 +0000775 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700776 """
777 Find any new HQEs and call schedule_pre_job_tasks for it.
778
779 This involves setting the status of the HQE and creating a row in the
780 db corresponding the the special task, through
781 scheduler_models._queue_special_task. The new db row is then added as
782 an agent to the dispatcher through _schedule_special_tasks and
783 scheduled for execution on the drone through _handle_agents.
784 """
showard89f84db2009-03-12 20:39:13 +0000785 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000786
beepscc9fc702013-12-02 12:45:38 -0800787 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700788 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700789 new_jobs_with_hosts = 0
790 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800791 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700792 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000793
beepscc9fc702013-12-02 12:45:38 -0800794 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000795 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000796 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700797 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000798 else:
beepscc9fc702013-12-02 12:45:38 -0800799 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700800 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700801
beepsb255fc52013-10-13 23:28:54 -0700802 stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800803 if not host_jobs:
804 return
Prashanth Bf66d51b2014-05-06 12:42:25 -0700805 if not _inline_host_acquisition:
806 message = ('Found %s jobs that need hosts though '
807 '_inline_host_acquisition=%s. Will acquire hosts.' %
808 ([str(job) for job in host_jobs],
809 _inline_host_acquisition))
810 email_manager.manager.enqueue_notify_email(
811 'Processing unexpected host acquisition requests', message)
812 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
813 for host_assignment in jobs_with_hosts:
814 self._schedule_host_job(host_assignment.host, host_assignment.job)
815 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800816
beepsb255fc52013-10-13 23:28:54 -0700817 stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
818 stats.Gauge(key).send('new_jobs_without_hosts',
819 new_jobs_need_hosts - new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000820
821
showard8cc058f2009-09-08 16:26:33 +0000822 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700823 """
824 Adds agents to the dispatcher.
825
826 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
827 QueueTask for example, will have a job with a control file, and
828 the agent will have methods that poll, abort and check if the queue
829 task is finished. The dispatcher runs the agent_task, as well as
830 other agents in it's _agents member, through _handle_agents, by
831 calling the Agents tick().
832
833 This method creates an agent for each HQE in one of (starting, running,
834 gathering, parsing, archiving) states, and adds it to the dispatcher so
835 it is handled by _handle_agents.
836 """
showardd1195652009-12-08 22:21:02 +0000837 for agent_task in self._get_queue_entry_agent_tasks():
838 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000839
840
841 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000842 for entry in scheduler_models.HostQueueEntry.fetch(
843 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000844 task = entry.job.schedule_delayed_callback_task(entry)
845 if task:
showardd1195652009-12-08 22:21:02 +0000846 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000847
848
jadmanski0afbb632008-06-06 21:10:57 +0000849 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700850 """
851 Looks through the afe_host_queue_entries for an aborted entry.
852
853 The aborted bit is set on an HQE in many ways, the most common
854 being when a user requests an abort through the frontend, which
855 results in an rpc from the afe to abort_host_queue_entries.
856 """
jamesrene7c65cb2010-06-08 20:38:10 +0000857 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000858 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700859 where='aborted=1 and complete=0'):
showardf4a2e502009-07-28 20:06:39 +0000860 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800861
862 # The task would have started off with both is_complete and
863 # is_active = False. Aborted tasks are neither active nor complete.
864 # For all currently active tasks this will happen through the agent,
865 # but we need to manually update the special tasks that haven't
866 # started yet, because they don't have agents.
867 models.SpecialTask.objects.filter(is_active=False,
868 queue_entry_id=entry.id).update(is_complete=True)
869
showardd3dc1992009-04-22 21:01:40 +0000870 for agent in self.get_agents_for_entry(entry):
871 agent.abort()
872 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000873 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700874 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000875 for job in jobs_to_stop:
876 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000877
878
beeps8bb1f7d2013-08-05 01:30:09 -0700879 def _find_aborted_special_tasks(self):
880 """
881 Find SpecialTasks that have been marked for abortion.
882
883 Poll the database looking for SpecialTasks that are active
884 and have been marked for abortion, then abort them.
885 """
886
887 # The completed and active bits are very important when it comes
888 # to scheduler correctness. The active bit is set through the prolog
889 # of a special task, and reset through the cleanup method of the
890 # SpecialAgentTask. The cleanup is called both through the abort and
891 # epilog. The complete bit is set in several places, and in general
892 # a hanging job will have is_active=1 is_complete=0, while a special
893 # task which completed will have is_active=0 is_complete=1. To check
894 # aborts we directly check active because the complete bit is set in
895 # several places, including the epilog of agent tasks.
896 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
897 is_aborted=True)
898 for task in aborted_tasks:
899 # There are 2 ways to get the agent associated with a task,
900 # through the host and through the hqe. A special task
901 # always needs a host, but doesn't always need a hqe.
902 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700903 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000904
beeps8bb1f7d2013-08-05 01:30:09 -0700905 # The epilog preforms critical actions such as
906 # queueing the next SpecialTask, requeuing the
907 # hqe etc, however it doesn't actually kill the
908 # monitor process and set the 'done' bit. Epilogs
909 # assume that the job failed, and that the monitor
910 # process has already written an exit code. The
911 # done bit is a necessary condition for
912 # _handle_agents to schedule any more special
913 # tasks against the host, and it must be set
914 # in addition to is_active, is_complete and success.
915 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000916 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700917
918
showard324bf812009-01-20 23:23:38 +0000919 def _can_start_agent(self, agent, num_started_this_cycle,
920 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000921 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000922 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000923 return True
924 # don't allow any nonzero-process agents to run after we've reached a
925 # limit (this avoids starvation of many-process agents)
926 if have_reached_limit:
927 return False
928 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000929 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000930 agent.task.owner_username,
931 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000932 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000933 return False
934 # if a single agent exceeds the per-cycle throttling, still allow it to
935 # run when it's the first agent in the cycle
936 if num_started_this_cycle == 0:
937 return True
938 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000939 if (num_started_this_cycle + agent.task.num_processes >
940 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000941 return False
942 return True
943
944
jadmanski0afbb632008-06-06 21:10:57 +0000945 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700946 """
947 Handles agents of the dispatcher.
948
949 Appropriate Agents are added to the dispatcher through
950 _schedule_running_host_queue_entries. These agents each
951 have a task. This method runs the agents task through
952 agent.tick() leading to:
953 agent.start
954 prolog -> AgentTasks prolog
955 For each queue entry:
956 sets host status/status to Running
957 set started_on in afe_host_queue_entries
958 run -> AgentTasks run
959 Creates PidfileRunMonitor
960 Queues the autoserv command line for this AgentTask
961 via the drone manager. These commands are executed
962 through the drone managers execute actions.
963 poll -> AgentTasks/BaseAgentTask poll
964 checks the monitors exit_code.
965 Executes epilog if task is finished.
966 Executes AgentTasks _finish_task
967 finish_task is usually responsible for setting the status
968 of the HQE/host, and updating it's active and complete fileds.
969
970 agent.is_done
971 Removed the agent from the dispatchers _agents queue.
972 Is_done checks the finished bit on the agent, that is
973 set based on the Agents task. During the agents poll
974 we check to see if the monitor process has exited in
975 it's finish method, and set the success member of the
976 task based on this exit code.
977 """
jadmanski0afbb632008-06-06 21:10:57 +0000978 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000979 have_reached_limit = False
980 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700981 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000982 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700983 self._log_extra_msg('Processing Agent with Host Ids: %s and '
984 'queue_entry ids:%s' % (agent.host_ids,
985 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000986 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000987 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000988 have_reached_limit):
989 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700990 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000991 continue
showardd1195652009-12-08 22:21:02 +0000992 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700993 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000994 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700995 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000996 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700997 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000998 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700999 logging.info('%d running processes. %d added this cycle.',
1000 _drone_manager.total_running_processes(),
1001 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +00001002
1003
showard29f7cd22009-04-29 21:16:24 +00001004 def _process_recurring_runs(self):
1005 recurring_runs = models.RecurringRun.objects.filter(
1006 start_date__lte=datetime.datetime.now())
1007 for rrun in recurring_runs:
1008 # Create job from template
1009 job = rrun.job
1010 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001011 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001012
1013 host_objects = info['hosts']
1014 one_time_hosts = info['one_time_hosts']
1015 metahost_objects = info['meta_hosts']
1016 dependencies = info['dependencies']
1017 atomic_group = info['atomic_group']
1018
1019 for host in one_time_hosts or []:
1020 this_host = models.Host.create_one_time_host(host.hostname)
1021 host_objects.append(this_host)
1022
1023 try:
1024 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001025 options=options,
showard29f7cd22009-04-29 21:16:24 +00001026 host_objects=host_objects,
1027 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001028 atomic_group=atomic_group)
1029
1030 except Exception, ex:
1031 logging.exception(ex)
1032 #TODO send email
1033
1034 if rrun.loop_count == 1:
1035 rrun.delete()
1036 else:
1037 if rrun.loop_count != 0: # if not infinite loop
1038 # calculate new start_date
1039 difference = datetime.timedelta(seconds=rrun.loop_period)
1040 rrun.start_date = rrun.start_date + difference
1041 rrun.loop_count -= 1
1042 rrun.save()
1043
1044
Simran Basia858a232012-08-21 11:04:37 -07001045SiteDispatcher = utils.import_site_class(
1046 __file__, 'autotest_lib.scheduler.site_monitor_db',
1047 'SiteDispatcher', BaseDispatcher)
1048
1049class Dispatcher(SiteDispatcher):
1050 pass
1051
1052
mbligh36768f02008-02-22 18:28:33 +00001053class Agent(object):
showard77182562009-06-10 00:16:05 +00001054 """
Alex Miller47715eb2013-07-24 03:34:01 -07001055 An agent for use by the Dispatcher class to perform a task. An agent wraps
1056 around an AgentTask mainly to associate the AgentTask with the queue_entry
1057 and host ids.
showard77182562009-06-10 00:16:05 +00001058
1059 The following methods are required on all task objects:
1060 poll() - Called periodically to let the task check its status and
1061 update its internal state. If the task succeeded.
1062 is_done() - Returns True if the task is finished.
1063 abort() - Called when an abort has been requested. The task must
1064 set its aborted attribute to True if it actually aborted.
1065
1066 The following attributes are required on all task objects:
1067 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001068 success - bool, True if this task succeeded.
1069 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1070 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001071 """
1072
1073
showard418785b2009-11-23 20:19:59 +00001074 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001075 """
Alex Miller47715eb2013-07-24 03:34:01 -07001076 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001077 """
showard8cc058f2009-09-08 16:26:33 +00001078 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001079
showard77182562009-06-10 00:16:05 +00001080 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001081 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001082
showard8cc058f2009-09-08 16:26:33 +00001083 self.queue_entry_ids = task.queue_entry_ids
1084 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001085
showard8cc058f2009-09-08 16:26:33 +00001086 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001087 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001088
1089
jadmanski0afbb632008-06-06 21:10:57 +00001090 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001091 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001092 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001093 self.task.poll()
1094 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001095 self.finished = True
showardec113162008-05-08 00:52:49 +00001096
1097
jadmanski0afbb632008-06-06 21:10:57 +00001098 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001099 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001100
1101
showardd3dc1992009-04-22 21:01:40 +00001102 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001103 if self.task:
1104 self.task.abort()
1105 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001106 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001107 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001108
showardd3dc1992009-04-22 21:01:40 +00001109
beeps5e2bb4a2013-10-28 11:26:45 -07001110class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001111 """
1112 Common functionality for QueueTask and HostlessQueueTask
1113 """
1114 def __init__(self, queue_entries):
1115 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001116 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001117 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001118
1119
showard73ec0442009-02-07 02:05:20 +00001120 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001121 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001122
1123
jamesrenc44ae992010-02-19 00:12:54 +00001124 def _write_control_file(self, execution_path):
1125 control_path = _drone_manager.attach_file_to_execution(
1126 execution_path, self.job.control_file)
1127 return control_path
1128
1129
Aviv Keshet308e7362013-05-21 14:43:16 -07001130 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001131 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001132 execution_path = self.queue_entries[0].execution_path()
1133 control_path = self._write_control_file(execution_path)
1134 hostnames = ','.join(entry.host.hostname
1135 for entry in self.queue_entries
1136 if not entry.is_hostless())
1137
1138 execution_tag = self.queue_entries[0].execution_tag()
1139 params = _autoserv_command_line(
1140 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001141 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001142 _drone_manager.absolute_path(control_path)],
1143 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001144 if self.job.is_image_update_job():
1145 params += ['--image', self.job.update_image_path]
1146
jamesrenc44ae992010-02-19 00:12:54 +00001147 return params
showardd1195652009-12-08 22:21:02 +00001148
1149
1150 @property
1151 def num_processes(self):
1152 return len(self.queue_entries)
1153
1154
1155 @property
1156 def owner_username(self):
1157 return self.job.owner
1158
1159
1160 def _working_directory(self):
1161 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001162
1163
jadmanski0afbb632008-06-06 21:10:57 +00001164 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001165 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001166 keyval_dict = self.job.keyval_dict()
1167 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001168 group_name = self.queue_entries[0].get_group_name()
1169 if group_name:
1170 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001171 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001172 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001173 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001174 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001175
1176
showard35162b02009-03-03 02:17:30 +00001177 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001178 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001179 _drone_manager.write_lines_to_file(error_file_path,
1180 [_LOST_PROCESS_ERROR])
1181
1182
showardd3dc1992009-04-22 21:01:40 +00001183 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001184 if not self.monitor:
1185 return
1186
showardd9205182009-04-27 20:09:55 +00001187 self._write_job_finished()
1188
showard35162b02009-03-03 02:17:30 +00001189 if self.monitor.lost_process:
1190 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001191
jadmanskif7fa2cc2008-10-01 14:13:23 +00001192
showardcbd74612008-11-19 21:42:02 +00001193 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001194 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001195 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001196 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001197 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001198
1199
jadmanskif7fa2cc2008-10-01 14:13:23 +00001200 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001201 if not self.monitor or not self.monitor.has_process():
1202 return
1203
jadmanskif7fa2cc2008-10-01 14:13:23 +00001204 # build up sets of all the aborted_by and aborted_on values
1205 aborted_by, aborted_on = set(), set()
1206 for queue_entry in self.queue_entries:
1207 if queue_entry.aborted_by:
1208 aborted_by.add(queue_entry.aborted_by)
1209 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1210 aborted_on.add(t)
1211
1212 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001213 # TODO(showard): this conditional is now obsolete, we just need to leave
1214 # it in temporarily for backwards compatibility over upgrades. delete
1215 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001216 assert len(aborted_by) <= 1
1217 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001218 aborted_by_value = aborted_by.pop()
1219 aborted_on_value = max(aborted_on)
1220 else:
1221 aborted_by_value = 'autotest_system'
1222 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001223
showarda0382352009-02-11 23:36:43 +00001224 self._write_keyval_after_job("aborted_by", aborted_by_value)
1225 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001226
showardcbd74612008-11-19 21:42:02 +00001227 aborted_on_string = str(datetime.datetime.fromtimestamp(
1228 aborted_on_value))
1229 self._write_status_comment('Job aborted by %s on %s' %
1230 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001231
1232
jadmanski0afbb632008-06-06 21:10:57 +00001233 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001234 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001235 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001236 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001237
1238
jadmanski0afbb632008-06-06 21:10:57 +00001239 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001240 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001241 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001242
1243
1244class QueueTask(AbstractQueueTask):
1245 def __init__(self, queue_entries):
1246 super(QueueTask, self).__init__(queue_entries)
1247 self._set_ids(queue_entries=queue_entries)
1248
1249
1250 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001251 self._check_queue_entry_statuses(
1252 self.queue_entries,
1253 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1254 models.HostQueueEntry.Status.RUNNING),
1255 allowed_host_statuses=(models.Host.Status.PENDING,
1256 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001257
1258 super(QueueTask, self).prolog()
1259
1260 for queue_entry in self.queue_entries:
1261 self._write_host_keyvals(queue_entry.host)
1262 queue_entry.host.set_status(models.Host.Status.RUNNING)
1263 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001264
1265
1266 def _finish_task(self):
1267 super(QueueTask, self)._finish_task()
1268
1269 for queue_entry in self.queue_entries:
1270 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001271 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001272
1273
Alex Miller9f01d5d2013-08-08 02:26:01 -07001274 def _command_line(self):
1275 invocation = super(QueueTask, self)._command_line()
1276 return invocation + ['--verify_job_repo_url']
1277
1278
Dan Shi1a189052013-10-28 14:41:35 -07001279class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001280 def __init__(self, queue_entry):
1281 super(HostlessQueueTask, self).__init__([queue_entry])
1282 self.queue_entry_ids = [queue_entry.id]
1283
1284
1285 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001286 super(HostlessQueueTask, self).prolog()
1287
1288
mbligh4608b002010-01-05 18:22:35 +00001289 def _finish_task(self):
1290 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001291
1292 # When a job is added to database, its initial status is always
1293 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1294 # status, check if any of them can be started. If scheduler hits some
Alex Millerac189f32014-06-23 13:55:23 -07001295 # limit, e.g., max_hostless_jobs_per_drone,
1296 # max_processes_started_per_cycle, scheduler will leave these jobs in
1297 # Starting status. Otherwise, the jobs' status will be changed to
1298 # Running, and an autoserv process will be started in drone for each of
1299 # these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001300 # If the entry is still in status Starting, the process has not started
1301 # yet. Therefore, there is no need to parse and collect log. Without
1302 # this check, exception will be raised by scheduler as execution_subdir
1303 # for this queue entry does not have a value yet.
1304 hqe = self.queue_entries[0]
1305 if hqe.status != models.HostQueueEntry.Status.STARTING:
1306 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001307
1308
mbligh36768f02008-02-22 18:28:33 +00001309if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001310 main()