blob: 6a2fee946d97320a7aa3d2b2ed4ab70af717c045 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
Dan Shif6c65bd2014-08-29 16:15:07 -07008import datetime
9import gc
10import logging
11import optparse
12import os
13import signal
14import sys
15import time
showard402934a2009-12-21 22:20:47 +000016
Alex Miller05d7b4c2013-03-04 07:49:38 -080017import common
showard21baa452008-10-21 00:08:39 +000018from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000019
20import django.db
21
Prashanth B0e960282014-05-13 19:38:28 -070022from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070023from autotest_lib.client.common_lib import utils
Michael Liangda8c60a2014-06-03 13:24:51 -070024from autotest_lib.client.common_lib.cros.graphite import stats
Prashanth B0e960282014-05-13 19:38:28 -070025from autotest_lib.frontend.afe import models, rpc_utils
beeps5e2bb4a2013-10-28 11:26:45 -070026from autotest_lib.scheduler import agent_task, drone_manager, drones
27from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
28from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070029from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070030from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070031from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000032from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080033from autotest_lib.scheduler import status_server, scheduler_config
Prashanth Bf66d51b2014-05-06 12:42:25 -070034from autotest_lib.scheduler import scheduler_lib
Aviv Keshet308e7362013-05-21 14:43:16 -070035from autotest_lib.server import autoserv_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080036
showard549afad2009-08-20 23:33:36 +000037BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
38PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000039
mbligh36768f02008-02-22 18:28:33 +000040RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000041AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
42
43if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000044 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000045AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
46AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
47
48if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000049 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000050
showard35162b02009-03-03 02:17:30 +000051# error message to leave in results dir when an autoserv process disappears
52# mysteriously
53_LOST_PROCESS_ERROR = """\
54Autoserv failed abnormally during execution for this job, probably due to a
55system error on the Autotest server. Full results may not be available. Sorry.
56"""
57
Prashanth B0e960282014-05-13 19:38:28 -070058_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070059_db = None
mbligh36768f02008-02-22 18:28:33 +000060_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070061
62# These 2 globals are replaced for testing
63_autoserv_directory = autoserv_utils.autoserv_directory
64_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000065_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000066_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070067_inline_host_acquisition = global_config.global_config.get_config_value(
68 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
69 default=True)
70
mbligh36768f02008-02-22 18:28:33 +000071
Eric Lie0493a42010-11-15 13:05:43 -080072def _parser_path_default(install_dir):
73 return os.path.join(install_dir, 'tko', 'parse')
74_parser_path_func = utils.import_site_function(
75 __file__, 'autotest_lib.scheduler.site_monitor_db',
76 'parser_path', _parser_path_default)
77_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
78
mbligh36768f02008-02-22 18:28:33 +000079
mbligh83c1e9e2009-05-01 23:10:41 +000080def _site_init_monitor_db_dummy():
81 return {}
82
83
jamesren76fcf192010-04-21 20:39:50 +000084def _verify_default_drone_set_exists():
85 if (models.DroneSet.drone_sets_enabled() and
86 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070087 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080088 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000089
90
91def _sanity_check():
92 """Make sure the configs are consistent before starting the scheduler"""
93 _verify_default_drone_set_exists()
94
95
mbligh36768f02008-02-22 18:28:33 +000096def main():
showard27f33872009-04-07 18:20:53 +000097 try:
showard549afad2009-08-20 23:33:36 +000098 try:
99 main_without_exception_handling()
100 except SystemExit:
101 raise
102 except:
103 logging.exception('Exception escaping in monitor_db')
104 raise
105 finally:
106 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000107
108
109def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700110 scheduler_lib.setup_logging(
111 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
112 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000113 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000114 parser = optparse.OptionParser(usage)
115 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
116 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000117 parser.add_option('--test', help='Indicate that scheduler is under ' +
118 'test and should use dummy autoserv and no parsing',
119 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700120 parser.add_option('--production',
121 help=('Indicate that scheduler is running in production '
122 'environment and it can use database that is not '
123 'hosted in localhost. If it is set to False, '
124 'scheduler will fail if database is not in '
125 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700126 action='store_true', default=False)
jadmanski0afbb632008-06-06 21:10:57 +0000127 (options, args) = parser.parse_args()
128 if len(args) != 1:
129 parser.print_usage()
130 return
mbligh36768f02008-02-22 18:28:33 +0000131
Dan Shif6c65bd2014-08-29 16:15:07 -0700132 scheduler_lib.check_production_settings(options)
133
showard5613c662009-06-08 23:30:33 +0000134 scheduler_enabled = global_config.global_config.get_config_value(
135 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
136
137 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800138 logging.error("Scheduler not enabled, set enable_scheduler to true in "
139 "the global_config's SCHEDULER section to enable it. "
140 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000141 sys.exit(1)
142
jadmanski0afbb632008-06-06 21:10:57 +0000143 global RESULTS_DIR
144 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000145
mbligh83c1e9e2009-05-01 23:10:41 +0000146 site_init = utils.import_site_function(__file__,
147 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
148 _site_init_monitor_db_dummy)
149 site_init()
150
showardcca334f2009-03-12 20:38:34 +0000151 # Change the cwd while running to avoid issues incase we were launched from
152 # somewhere odd (such as a random NFS home directory of the person running
153 # sudo to launch us as the appropriate user).
154 os.chdir(RESULTS_DIR)
155
jamesrenc7d387e2010-08-10 21:48:30 +0000156 # This is helpful for debugging why stuff a scheduler launches is
157 # misbehaving.
158 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000159
jadmanski0afbb632008-06-06 21:10:57 +0000160 if options.test:
161 global _autoserv_path
162 _autoserv_path = 'autoserv_dummy'
163 global _testing_mode
164 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000165
jamesrenc44ae992010-02-19 00:12:54 +0000166 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000167 server.start()
168
jadmanski0afbb632008-06-06 21:10:57 +0000169 try:
jamesrenc44ae992010-02-19 00:12:54 +0000170 initialize()
showardc5afc462009-01-13 00:09:39 +0000171 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000172 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000173
Eric Lia82dc352011-02-23 13:15:52 -0800174 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000175 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000176 time.sleep(scheduler_config.config.tick_pause_sec)
Prashanth B4ec98672014-05-15 10:44:54 -0700177 except Exception:
showard170873e2009-01-07 00:22:26 +0000178 email_manager.manager.log_stacktrace(
179 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000180
showard170873e2009-01-07 00:22:26 +0000181 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000182 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000183 _drone_manager.shutdown()
Prashanth B0e960282014-05-13 19:38:28 -0700184 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000185
186
Prashanth B4ec98672014-05-15 10:44:54 -0700187def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000188 global _shutdown
189 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000190 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000191
192
jamesrenc44ae992010-02-19 00:12:54 +0000193def initialize():
showardb18134f2009-03-20 20:52:18 +0000194 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
195 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000196
showard8de37132009-08-31 18:33:08 +0000197 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000198 logging.critical("monitor_db already running, aborting!")
199 sys.exit(1)
200 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000201
showardb1e51872008-10-07 11:08:18 +0000202 if _testing_mode:
203 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700204 scheduler_lib.DB_CONFIG_SECTION, 'database',
205 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000206
jadmanski0afbb632008-06-06 21:10:57 +0000207 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700208 global _db_manager
209 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700210 global _db
211 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000212 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700213 signal.signal(signal.SIGINT, handle_signal)
214 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000215
jamesrenc44ae992010-02-19 00:12:54 +0000216 initialize_globals()
217 scheduler_models.initialize()
218
showardd1ee1dd2009-01-07 21:33:08 +0000219 drones = global_config.global_config.get_config_value(
220 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
221 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000222 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000223 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000224 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
225
showardb18134f2009-03-20 20:52:18 +0000226 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000227
228
jamesrenc44ae992010-02-19 00:12:54 +0000229def initialize_globals():
230 global _drone_manager
231 _drone_manager = drone_manager.instance()
232
233
showarded2afea2009-07-07 20:54:07 +0000234def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
235 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000236 """
237 @returns The autoserv command line as a list of executable + parameters.
238
239 @param machines - string - A machine or comma separated list of machines
240 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000241 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700242 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
243 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000244 @param queue_entry - A HostQueueEntry object - If supplied and no Job
245 object was supplied, this will be used to lookup the Job object.
246 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700247 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
248 machines, results_directory=drone_manager.WORKING_DIRECTORY,
249 extra_args=extra_args, job=job, queue_entry=queue_entry,
250 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000251
252
Simran Basia858a232012-08-21 11:04:37 -0700253class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800254
255
jadmanski0afbb632008-06-06 21:10:57 +0000256 def __init__(self):
257 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000258 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700259 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000260 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700261 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700262 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Jakob Jülich36accc62014-07-23 10:26:55 -0700263 _db)
showard170873e2009-01-07 00:22:26 +0000264 self._host_agents = {}
265 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000266 self._tick_count = 0
267 self._last_garbage_stats_time = time.time()
268 self._seconds_between_garbage_stats = 60 * (
269 global_config.global_config.get_config_value(
270 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700271 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700272 self._tick_debug = global_config.global_config.get_config_value(
273 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
274 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700275 self._extra_debugging = global_config.global_config.get_config_value(
276 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
277 default=False)
mbligh36768f02008-02-22 18:28:33 +0000278
Prashanth Bf66d51b2014-05-06 12:42:25 -0700279 # If _inline_host_acquisition is set the scheduler will acquire and
280 # release hosts against jobs inline, with the tick. Otherwise the
281 # scheduler will only focus on jobs that already have hosts, and
282 # will not explicitly unlease a host when a job finishes using it.
283 self._job_query_manager = query_managers.AFEJobQueryManager()
284 self._host_scheduler = (host_scheduler.BaseHostScheduler()
285 if _inline_host_acquisition else
286 host_scheduler.DummyHostScheduler())
287
mbligh36768f02008-02-22 18:28:33 +0000288
showard915958d2009-04-22 21:00:58 +0000289 def initialize(self, recover_hosts=True):
290 self._periodic_cleanup.initialize()
291 self._24hr_upkeep.initialize()
292
jadmanski0afbb632008-06-06 21:10:57 +0000293 # always recover processes
294 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000295
jadmanski0afbb632008-06-06 21:10:57 +0000296 if recover_hosts:
297 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000298
299
Simran Basi0ec94dd2012-08-28 09:50:10 -0700300 def _log_tick_msg(self, msg):
301 if self._tick_debug:
302 logging.debug(msg)
303
304
Simran Basidef92872012-09-20 13:34:34 -0700305 def _log_extra_msg(self, msg):
306 if self._extra_debugging:
307 logging.debug(msg)
308
309
jadmanski0afbb632008-06-06 21:10:57 +0000310 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700311 """
312 This is an altered version of tick() where we keep track of when each
313 major step begins so we can try to figure out where we are using most
314 of the tick time.
315 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700316 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700317 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000318 self._garbage_collection()
Prashanth B340fd1e2014-06-22 12:44:10 -0700319 self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
320 _drone_manager.trigger_refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700321 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000322 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700323 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000324 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700325 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000326 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700327 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000328 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700329 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000330 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700331 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000332 self._schedule_new_jobs()
Prashanth B340fd1e2014-06-22 12:44:10 -0700333 self._log_tick_msg('Calling _drone_manager.sync_refresh().')
334 _drone_manager.sync_refresh()
Prashanth B67548092014-07-11 18:46:01 -0700335 self._log_tick_msg('Calling _find_aborting().')
336 self._find_aborting()
337 self._log_tick_msg('Calling _find_aborted_special_tasks().')
338 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700339 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000340 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700341 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000342 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700343 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000344 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700345 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700346 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700347 with timer.get_client('email_manager_send_queued_emails'):
348 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700349 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700350 with timer.get_client('django_db_reset_queries'):
351 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000352 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000353
showard97aed502008-11-04 02:01:24 +0000354
mblighf3294cc2009-04-08 21:17:38 +0000355 def _run_cleanup(self):
356 self._periodic_cleanup.run_cleanup_maybe()
357 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000358
mbligh36768f02008-02-22 18:28:33 +0000359
showardf13a9e22009-12-18 22:54:09 +0000360 def _garbage_collection(self):
361 threshold_time = time.time() - self._seconds_between_garbage_stats
362 if threshold_time < self._last_garbage_stats_time:
363 # Don't generate these reports very often.
364 return
365
366 self._last_garbage_stats_time = time.time()
367 # Force a full level 0 collection (because we can, it doesn't hurt
368 # at this interval).
369 gc.collect()
370 logging.info('Logging garbage collector stats on tick %d.',
371 self._tick_count)
372 gc_stats._log_garbage_collector_stats()
373
374
showard170873e2009-01-07 00:22:26 +0000375 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
376 for object_id in object_ids:
377 agent_dict.setdefault(object_id, set()).add(agent)
378
379
380 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
381 for object_id in object_ids:
382 assert object_id in agent_dict
383 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700384 # If an ID has no more active agent associated, there is no need to
385 # keep it in the dictionary. Otherwise, scheduler will keep an
386 # unnecessarily big dictionary until being restarted.
387 if not agent_dict[object_id]:
388 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000389
390
showardd1195652009-12-08 22:21:02 +0000391 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700392 """
393 Creates and adds an agent to the dispatchers list.
394
395 In creating the agent we also pass on all the queue_entry_ids and
396 host_ids from the special agent task. For every agent we create, we
397 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
398 against the host_ids given to it. So theoritically, a host can have any
399 number of agents associated with it, and each of them can have any
400 special agent task, though in practice we never see > 1 agent/task per
401 host at any time.
402
403 @param agent_task: A SpecialTask for the agent to manage.
404 """
showardd1195652009-12-08 22:21:02 +0000405 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000406 self._agents.append(agent)
407 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000408 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
409 self._register_agent_for_ids(self._queue_entry_agents,
410 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000411
showard170873e2009-01-07 00:22:26 +0000412
413 def get_agents_for_entry(self, queue_entry):
414 """
415 Find agents corresponding to the specified queue_entry.
416 """
showardd3dc1992009-04-22 21:01:40 +0000417 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000418
419
420 def host_has_agent(self, host):
421 """
422 Determine if there is currently an Agent present using this host.
423 """
424 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000425
426
jadmanski0afbb632008-06-06 21:10:57 +0000427 def remove_agent(self, agent):
428 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000429 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
430 agent)
431 self._unregister_agent_for_ids(self._queue_entry_agents,
432 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000433
434
showard8cc058f2009-09-08 16:26:33 +0000435 def _host_has_scheduled_special_task(self, host):
436 return bool(models.SpecialTask.objects.filter(host__id=host.id,
437 is_active=False,
438 is_complete=False))
439
440
jadmanski0afbb632008-06-06 21:10:57 +0000441 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000442 agent_tasks = self._create_recovery_agent_tasks()
443 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000444 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000445 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000446 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000447 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000448 self._reverify_remaining_hosts()
449 # reinitialize drones after killing orphaned processes, since they can
450 # leave around files when they die
451 _drone_manager.execute_actions()
452 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000453
showard170873e2009-01-07 00:22:26 +0000454
showardd1195652009-12-08 22:21:02 +0000455 def _create_recovery_agent_tasks(self):
456 return (self._get_queue_entry_agent_tasks()
457 + self._get_special_task_agent_tasks(is_active=True))
458
459
460 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700461 """
462 Get agent tasks for all hqe in the specified states.
463
464 Loosely this translates to taking a hqe in one of the specified states,
465 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
466 through _get_agent_task_for_queue_entry. Each queue entry can only have
467 one agent task at a time, but there might be multiple queue entries in
468 the group.
469
470 @return: A list of AgentTasks.
471 """
showardd1195652009-12-08 22:21:02 +0000472 # host queue entry statuses handled directly by AgentTasks (Verifying is
473 # handled through SpecialTasks, so is not listed here)
474 statuses = (models.HostQueueEntry.Status.STARTING,
475 models.HostQueueEntry.Status.RUNNING,
476 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000477 models.HostQueueEntry.Status.PARSING,
478 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000479 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000480 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000481 where='status IN (%s)' % status_list)
Alex Miller47cd2472013-11-25 15:20:04 -0800482 stats.Gauge('scheduler.jobs_per_tick').send(
483 'running', len(queue_entries))
showardd1195652009-12-08 22:21:02 +0000484
485 agent_tasks = []
486 used_queue_entries = set()
487 for entry in queue_entries:
488 if self.get_agents_for_entry(entry):
489 # already being handled
490 continue
491 if entry in used_queue_entries:
492 # already picked up by a synchronous job
493 continue
494 agent_task = self._get_agent_task_for_queue_entry(entry)
495 agent_tasks.append(agent_task)
496 used_queue_entries.update(agent_task.queue_entries)
497 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000498
499
showardd1195652009-12-08 22:21:02 +0000500 def _get_special_task_agent_tasks(self, is_active=False):
501 special_tasks = models.SpecialTask.objects.filter(
502 is_active=is_active, is_complete=False)
503 return [self._get_agent_task_for_special_task(task)
504 for task in special_tasks]
505
506
507 def _get_agent_task_for_queue_entry(self, queue_entry):
508 """
beeps8bb1f7d2013-08-05 01:30:09 -0700509 Construct an AgentTask instance for the given active HostQueueEntry.
510
showardd1195652009-12-08 22:21:02 +0000511 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700512 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000513 """
514 task_entries = queue_entry.job.get_group_entries(queue_entry)
515 self._check_for_duplicate_host_entries(task_entries)
516
517 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
518 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000519 if queue_entry.is_hostless():
520 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000521 return QueueTask(queue_entries=task_entries)
522 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700523 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000524 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700525 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000526 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700527 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000528
Prashanth B0e960282014-05-13 19:38:28 -0700529 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800530 '_get_agent_task_for_queue_entry got entry with '
531 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000532
533
534 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000535 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
536 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000537 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000538 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000539 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000540 if using_host:
showardd1195652009-12-08 22:21:02 +0000541 self._assert_host_has_no_agent(task_entry)
542
543
544 def _assert_host_has_no_agent(self, entry):
545 """
546 @param entry: a HostQueueEntry or a SpecialTask
547 """
548 if self.host_has_agent(entry.host):
549 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700550 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000551 'While scheduling %s, host %s already has a host agent %s'
552 % (entry, entry.host, agent.task))
553
554
555 def _get_agent_task_for_special_task(self, special_task):
556 """
557 Construct an AgentTask class to run the given SpecialTask and add it
558 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700559
560 A special task is create through schedule_special_tasks, but only if
561 the host doesn't already have an agent. This happens through
562 add_agent_task. All special agent tasks are given a host on creation,
563 and a Null hqe. To create a SpecialAgentTask object, you need a
564 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
565 object contains a hqe it's passed on to the special agent task, which
566 creates a HostQueueEntry and saves it as it's queue_entry.
567
showardd1195652009-12-08 22:21:02 +0000568 @param special_task: a models.SpecialTask instance
569 @returns an AgentTask to run this SpecialTask
570 """
571 self._assert_host_has_no_agent(special_task)
572
beeps5e2bb4a2013-10-28 11:26:45 -0700573 special_agent_task_classes = (prejob_task.CleanupTask,
574 prejob_task.VerifyTask,
575 prejob_task.RepairTask,
576 prejob_task.ResetTask,
577 prejob_task.ProvisionTask)
578
showardd1195652009-12-08 22:21:02 +0000579 for agent_task_class in special_agent_task_classes:
580 if agent_task_class.TASK_TYPE == special_task.task:
581 return agent_task_class(task=special_task)
582
Prashanth B0e960282014-05-13 19:38:28 -0700583 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800584 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000585
586
587 def _register_pidfiles(self, agent_tasks):
588 for agent_task in agent_tasks:
589 agent_task.register_necessary_pidfiles()
590
591
592 def _recover_tasks(self, agent_tasks):
593 orphans = _drone_manager.get_orphaned_autoserv_processes()
594
595 for agent_task in agent_tasks:
596 agent_task.recover()
597 if agent_task.monitor and agent_task.monitor.has_process():
598 orphans.discard(agent_task.monitor.get_process())
599 self.add_agent_task(agent_task)
600
601 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000602
603
showard8cc058f2009-09-08 16:26:33 +0000604 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000605 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
606 % status):
showard0db3d432009-10-12 20:29:15 +0000607 if entry.status == status and not self.get_agents_for_entry(entry):
608 # The status can change during iteration, e.g., if job.run()
609 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000610 yield entry
611
612
showard6878e8b2009-07-20 22:37:45 +0000613 def _check_for_remaining_orphan_processes(self, orphans):
614 if not orphans:
615 return
616 subject = 'Unrecovered orphan autoserv processes remain'
617 message = '\n'.join(str(process) for process in orphans)
618 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000619
620 die_on_orphans = global_config.global_config.get_config_value(
621 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
622
623 if die_on_orphans:
624 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000625
showard170873e2009-01-07 00:22:26 +0000626
showard8cc058f2009-09-08 16:26:33 +0000627 def _recover_pending_entries(self):
628 for entry in self._get_unassigned_entries(
629 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000630 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000631 entry.on_pending()
632
633
showardb8900452009-10-12 20:31:01 +0000634 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000635 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000636 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
637 unrecovered_hqes = []
638 for queue_entry in queue_entries:
639 special_tasks = models.SpecialTask.objects.filter(
640 task__in=(models.SpecialTask.Task.CLEANUP,
641 models.SpecialTask.Task.VERIFY),
642 queue_entry__id=queue_entry.id,
643 is_complete=False)
644 if special_tasks.count() == 0:
645 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000646
showardb8900452009-10-12 20:31:01 +0000647 if unrecovered_hqes:
648 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700649 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000650 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000651 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000652
653
showard65db3932009-10-28 19:54:35 +0000654 def _schedule_special_tasks(self):
655 """
656 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700657
658 Special tasks include PreJobTasks like verify, reset and cleanup.
659 They are created through _schedule_new_jobs and associated with a hqe
660 This method translates SpecialTasks to the appropriate AgentTask and
661 adds them to the dispatchers agents list, so _handle_agents can execute
662 them.
showard65db3932009-10-28 19:54:35 +0000663 """
Prashanth B4ec98672014-05-15 10:44:54 -0700664 # When the host scheduler is responsible for acquisition we only want
665 # to run tasks with leased hosts. All hqe tasks will already have
666 # leased hosts, and we don't want to run frontend tasks till the host
667 # scheduler has vetted the assignment. Note that this doesn't include
668 # frontend tasks with hosts leased by other active hqes.
669 for task in self._job_query_manager.get_prioritized_special_tasks(
670 only_tasks_with_leased_hosts=not _inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000671 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000672 continue
showardd1195652009-12-08 22:21:02 +0000673 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000674
675
showard170873e2009-01-07 00:22:26 +0000676 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000677 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000678 # should never happen
showarded2afea2009-07-07 20:54:07 +0000679 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000680 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000681 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700682 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000683 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000684
685
jadmanski0afbb632008-06-06 21:10:57 +0000686 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000687 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700688 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000689 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000690 if self.host_has_agent(host):
691 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000692 continue
showard8cc058f2009-09-08 16:26:33 +0000693 if self._host_has_scheduled_special_task(host):
694 # host will have a special task scheduled on the next cycle
695 continue
showard170873e2009-01-07 00:22:26 +0000696 if print_message:
showardb18134f2009-03-20 20:52:18 +0000697 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000698 models.SpecialTask.objects.create(
699 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000700 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000701
702
jadmanski0afbb632008-06-06 21:10:57 +0000703 def _recover_hosts(self):
704 # recover "Repair Failed" hosts
705 message = 'Reverifying dead host %s'
706 self._reverify_hosts_where("status = 'Repair Failed'",
707 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000708
709
showard89f84db2009-03-12 20:39:13 +0000710 def _refresh_pending_queue_entries(self):
711 """
712 Lookup the pending HostQueueEntries and call our HostScheduler
713 refresh() method given that list. Return the list.
714
715 @returns A list of pending HostQueueEntries sorted in priority order.
716 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700717 queue_entries = self._job_query_manager.get_pending_queue_entries(
718 only_hostless=not _inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000719 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000720 return []
showard89f84db2009-03-12 20:39:13 +0000721 return queue_entries
722
723
showarda9545c02009-12-18 22:44:26 +0000724 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800725 """Schedule a hostless (suite) job.
726
727 @param queue_entry: The queue_entry representing the hostless job.
728 """
showarda9545c02009-12-18 22:44:26 +0000729 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700730
731 # Need to set execution_subdir before setting the status:
732 # After a restart of the scheduler, agents will be restored for HQEs in
733 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
734 # execution_subdir is needed. Therefore it must be set before entering
735 # one of these states.
736 # Otherwise, if the scheduler was interrupted between setting the status
737 # and the execution_subdir, upon it's restart restoring agents would
738 # fail.
739 # Is there a way to get a status in one of these states without going
740 # through this code? Following cases are possible:
741 # - If it's aborted before being started:
742 # active bit will be 0, so there's nothing to parse, it will just be
743 # set to completed by _find_aborting. Critical statuses are skipped.
744 # - If it's aborted or it fails after being started:
745 # It was started, so this code was executed.
746 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000747 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000748
749
beepscc9fc702013-12-02 12:45:38 -0800750 def _schedule_host_job(self, host, queue_entry):
751 """Schedules a job on the given host.
752
753 1. Assign the host to the hqe, if it isn't already assigned.
754 2. Create a SpecialAgentTask for the hqe.
755 3. Activate the hqe.
756
757 @param queue_entry: The job to schedule.
758 @param host: The host to schedule the job on.
759 """
760 if self.host_has_agent(host):
761 host_agent_task = list(self._host_agents.get(host.id))[0].task
762 subject = 'Host with agents assigned to an HQE'
763 message = ('HQE: %s assigned host %s, but the host has '
764 'agent: %s for queue_entry %s. The HQE '
beepsf7d35162014-02-13 16:42:13 -0800765 'will have to try and acquire a host next tick ' %
beepscc9fc702013-12-02 12:45:38 -0800766 (queue_entry, host.hostname, host_agent_task,
767 host_agent_task.queue_entry))
768 email_manager.manager.enqueue_notify_email(subject, message)
beepscc9fc702013-12-02 12:45:38 -0800769 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700770 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800771
772
showard89f84db2009-03-12 20:39:13 +0000773 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700774 """
775 Find any new HQEs and call schedule_pre_job_tasks for it.
776
777 This involves setting the status of the HQE and creating a row in the
778 db corresponding the the special task, through
779 scheduler_models._queue_special_task. The new db row is then added as
780 an agent to the dispatcher through _schedule_special_tasks and
781 scheduled for execution on the drone through _handle_agents.
782 """
showard89f84db2009-03-12 20:39:13 +0000783 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000784
beepscc9fc702013-12-02 12:45:38 -0800785 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700786 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700787 new_jobs_with_hosts = 0
788 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800789 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700790 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000791
beepscc9fc702013-12-02 12:45:38 -0800792 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000793 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000794 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700795 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000796 else:
beepscc9fc702013-12-02 12:45:38 -0800797 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700798 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700799
beepsb255fc52013-10-13 23:28:54 -0700800 stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800801 if not host_jobs:
802 return
Prashanth Bf66d51b2014-05-06 12:42:25 -0700803 if not _inline_host_acquisition:
804 message = ('Found %s jobs that need hosts though '
805 '_inline_host_acquisition=%s. Will acquire hosts.' %
806 ([str(job) for job in host_jobs],
807 _inline_host_acquisition))
808 email_manager.manager.enqueue_notify_email(
809 'Processing unexpected host acquisition requests', message)
810 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
811 for host_assignment in jobs_with_hosts:
812 self._schedule_host_job(host_assignment.host, host_assignment.job)
813 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800814
beepsb255fc52013-10-13 23:28:54 -0700815 stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
816 stats.Gauge(key).send('new_jobs_without_hosts',
817 new_jobs_need_hosts - new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000818
819
showard8cc058f2009-09-08 16:26:33 +0000820 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700821 """
822 Adds agents to the dispatcher.
823
824 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
825 QueueTask for example, will have a job with a control file, and
826 the agent will have methods that poll, abort and check if the queue
827 task is finished. The dispatcher runs the agent_task, as well as
828 other agents in it's _agents member, through _handle_agents, by
829 calling the Agents tick().
830
831 This method creates an agent for each HQE in one of (starting, running,
832 gathering, parsing, archiving) states, and adds it to the dispatcher so
833 it is handled by _handle_agents.
834 """
showardd1195652009-12-08 22:21:02 +0000835 for agent_task in self._get_queue_entry_agent_tasks():
836 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000837
838
839 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000840 for entry in scheduler_models.HostQueueEntry.fetch(
841 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000842 task = entry.job.schedule_delayed_callback_task(entry)
843 if task:
showardd1195652009-12-08 22:21:02 +0000844 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000845
846
jadmanski0afbb632008-06-06 21:10:57 +0000847 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700848 """
849 Looks through the afe_host_queue_entries for an aborted entry.
850
851 The aborted bit is set on an HQE in many ways, the most common
852 being when a user requests an abort through the frontend, which
853 results in an rpc from the afe to abort_host_queue_entries.
854 """
jamesrene7c65cb2010-06-08 20:38:10 +0000855 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000856 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700857 where='aborted=1 and complete=0'):
showardf4a2e502009-07-28 20:06:39 +0000858 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800859
860 # The task would have started off with both is_complete and
861 # is_active = False. Aborted tasks are neither active nor complete.
862 # For all currently active tasks this will happen through the agent,
863 # but we need to manually update the special tasks that haven't
864 # started yet, because they don't have agents.
865 models.SpecialTask.objects.filter(is_active=False,
866 queue_entry_id=entry.id).update(is_complete=True)
867
showardd3dc1992009-04-22 21:01:40 +0000868 for agent in self.get_agents_for_entry(entry):
869 agent.abort()
870 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000871 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700872 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000873 for job in jobs_to_stop:
874 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000875
876
beeps8bb1f7d2013-08-05 01:30:09 -0700877 def _find_aborted_special_tasks(self):
878 """
879 Find SpecialTasks that have been marked for abortion.
880
881 Poll the database looking for SpecialTasks that are active
882 and have been marked for abortion, then abort them.
883 """
884
885 # The completed and active bits are very important when it comes
886 # to scheduler correctness. The active bit is set through the prolog
887 # of a special task, and reset through the cleanup method of the
888 # SpecialAgentTask. The cleanup is called both through the abort and
889 # epilog. The complete bit is set in several places, and in general
890 # a hanging job will have is_active=1 is_complete=0, while a special
891 # task which completed will have is_active=0 is_complete=1. To check
892 # aborts we directly check active because the complete bit is set in
893 # several places, including the epilog of agent tasks.
894 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
895 is_aborted=True)
896 for task in aborted_tasks:
897 # There are 2 ways to get the agent associated with a task,
898 # through the host and through the hqe. A special task
899 # always needs a host, but doesn't always need a hqe.
900 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700901 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000902
beeps8bb1f7d2013-08-05 01:30:09 -0700903 # The epilog preforms critical actions such as
904 # queueing the next SpecialTask, requeuing the
905 # hqe etc, however it doesn't actually kill the
906 # monitor process and set the 'done' bit. Epilogs
907 # assume that the job failed, and that the monitor
908 # process has already written an exit code. The
909 # done bit is a necessary condition for
910 # _handle_agents to schedule any more special
911 # tasks against the host, and it must be set
912 # in addition to is_active, is_complete and success.
913 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000914 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700915
916
showard324bf812009-01-20 23:23:38 +0000917 def _can_start_agent(self, agent, num_started_this_cycle,
918 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000919 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000920 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000921 return True
922 # don't allow any nonzero-process agents to run after we've reached a
923 # limit (this avoids starvation of many-process agents)
924 if have_reached_limit:
925 return False
926 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000927 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000928 agent.task.owner_username,
929 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000930 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000931 return False
932 # if a single agent exceeds the per-cycle throttling, still allow it to
933 # run when it's the first agent in the cycle
934 if num_started_this_cycle == 0:
935 return True
936 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000937 if (num_started_this_cycle + agent.task.num_processes >
938 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000939 return False
940 return True
941
942
jadmanski0afbb632008-06-06 21:10:57 +0000943 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700944 """
945 Handles agents of the dispatcher.
946
947 Appropriate Agents are added to the dispatcher through
948 _schedule_running_host_queue_entries. These agents each
949 have a task. This method runs the agents task through
950 agent.tick() leading to:
951 agent.start
952 prolog -> AgentTasks prolog
953 For each queue entry:
954 sets host status/status to Running
955 set started_on in afe_host_queue_entries
956 run -> AgentTasks run
957 Creates PidfileRunMonitor
958 Queues the autoserv command line for this AgentTask
959 via the drone manager. These commands are executed
960 through the drone managers execute actions.
961 poll -> AgentTasks/BaseAgentTask poll
962 checks the monitors exit_code.
963 Executes epilog if task is finished.
964 Executes AgentTasks _finish_task
965 finish_task is usually responsible for setting the status
966 of the HQE/host, and updating it's active and complete fileds.
967
968 agent.is_done
969 Removed the agent from the dispatchers _agents queue.
970 Is_done checks the finished bit on the agent, that is
971 set based on the Agents task. During the agents poll
972 we check to see if the monitor process has exited in
973 it's finish method, and set the success member of the
974 task based on this exit code.
975 """
jadmanski0afbb632008-06-06 21:10:57 +0000976 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000977 have_reached_limit = False
978 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700979 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000980 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700981 self._log_extra_msg('Processing Agent with Host Ids: %s and '
982 'queue_entry ids:%s' % (agent.host_ids,
983 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000984 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000985 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000986 have_reached_limit):
987 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700988 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000989 continue
showardd1195652009-12-08 22:21:02 +0000990 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700991 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000992 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700993 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000994 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700995 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000996 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700997 logging.info('%d running processes. %d added this cycle.',
998 _drone_manager.total_running_processes(),
999 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +00001000
1001
showard29f7cd22009-04-29 21:16:24 +00001002 def _process_recurring_runs(self):
1003 recurring_runs = models.RecurringRun.objects.filter(
1004 start_date__lte=datetime.datetime.now())
1005 for rrun in recurring_runs:
1006 # Create job from template
1007 job = rrun.job
1008 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001009 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001010
1011 host_objects = info['hosts']
1012 one_time_hosts = info['one_time_hosts']
1013 metahost_objects = info['meta_hosts']
1014 dependencies = info['dependencies']
1015 atomic_group = info['atomic_group']
1016
1017 for host in one_time_hosts or []:
1018 this_host = models.Host.create_one_time_host(host.hostname)
1019 host_objects.append(this_host)
1020
1021 try:
1022 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001023 options=options,
showard29f7cd22009-04-29 21:16:24 +00001024 host_objects=host_objects,
1025 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001026 atomic_group=atomic_group)
1027
1028 except Exception, ex:
1029 logging.exception(ex)
1030 #TODO send email
1031
1032 if rrun.loop_count == 1:
1033 rrun.delete()
1034 else:
1035 if rrun.loop_count != 0: # if not infinite loop
1036 # calculate new start_date
1037 difference = datetime.timedelta(seconds=rrun.loop_period)
1038 rrun.start_date = rrun.start_date + difference
1039 rrun.loop_count -= 1
1040 rrun.save()
1041
1042
Simran Basia858a232012-08-21 11:04:37 -07001043SiteDispatcher = utils.import_site_class(
1044 __file__, 'autotest_lib.scheduler.site_monitor_db',
1045 'SiteDispatcher', BaseDispatcher)
1046
1047class Dispatcher(SiteDispatcher):
1048 pass
1049
1050
mbligh36768f02008-02-22 18:28:33 +00001051class Agent(object):
showard77182562009-06-10 00:16:05 +00001052 """
Alex Miller47715eb2013-07-24 03:34:01 -07001053 An agent for use by the Dispatcher class to perform a task. An agent wraps
1054 around an AgentTask mainly to associate the AgentTask with the queue_entry
1055 and host ids.
showard77182562009-06-10 00:16:05 +00001056
1057 The following methods are required on all task objects:
1058 poll() - Called periodically to let the task check its status and
1059 update its internal state. If the task succeeded.
1060 is_done() - Returns True if the task is finished.
1061 abort() - Called when an abort has been requested. The task must
1062 set its aborted attribute to True if it actually aborted.
1063
1064 The following attributes are required on all task objects:
1065 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001066 success - bool, True if this task succeeded.
1067 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1068 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001069 """
1070
1071
showard418785b2009-11-23 20:19:59 +00001072 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001073 """
Alex Miller47715eb2013-07-24 03:34:01 -07001074 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001075 """
showard8cc058f2009-09-08 16:26:33 +00001076 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001077
showard77182562009-06-10 00:16:05 +00001078 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001079 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001080
showard8cc058f2009-09-08 16:26:33 +00001081 self.queue_entry_ids = task.queue_entry_ids
1082 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001083
showard8cc058f2009-09-08 16:26:33 +00001084 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001085 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001086
1087
jadmanski0afbb632008-06-06 21:10:57 +00001088 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001089 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001090 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001091 self.task.poll()
1092 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001093 self.finished = True
showardec113162008-05-08 00:52:49 +00001094
1095
jadmanski0afbb632008-06-06 21:10:57 +00001096 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001097 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001098
1099
showardd3dc1992009-04-22 21:01:40 +00001100 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001101 if self.task:
1102 self.task.abort()
1103 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001104 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001105 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001106
showardd3dc1992009-04-22 21:01:40 +00001107
beeps5e2bb4a2013-10-28 11:26:45 -07001108class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001109 """
1110 Common functionality for QueueTask and HostlessQueueTask
1111 """
1112 def __init__(self, queue_entries):
1113 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001114 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001115 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001116
1117
showard73ec0442009-02-07 02:05:20 +00001118 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001119 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001120
1121
jamesrenc44ae992010-02-19 00:12:54 +00001122 def _write_control_file(self, execution_path):
1123 control_path = _drone_manager.attach_file_to_execution(
1124 execution_path, self.job.control_file)
1125 return control_path
1126
1127
Aviv Keshet308e7362013-05-21 14:43:16 -07001128 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001129 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001130 execution_path = self.queue_entries[0].execution_path()
1131 control_path = self._write_control_file(execution_path)
1132 hostnames = ','.join(entry.host.hostname
1133 for entry in self.queue_entries
1134 if not entry.is_hostless())
1135
1136 execution_tag = self.queue_entries[0].execution_tag()
1137 params = _autoserv_command_line(
1138 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001139 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001140 _drone_manager.absolute_path(control_path)],
1141 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001142 if self.job.is_image_update_job():
1143 params += ['--image', self.job.update_image_path]
1144
jamesrenc44ae992010-02-19 00:12:54 +00001145 return params
showardd1195652009-12-08 22:21:02 +00001146
1147
1148 @property
1149 def num_processes(self):
1150 return len(self.queue_entries)
1151
1152
1153 @property
1154 def owner_username(self):
1155 return self.job.owner
1156
1157
1158 def _working_directory(self):
1159 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001160
1161
jadmanski0afbb632008-06-06 21:10:57 +00001162 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001163 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001164 keyval_dict = self.job.keyval_dict()
1165 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001166 group_name = self.queue_entries[0].get_group_name()
1167 if group_name:
1168 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001169 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001170 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001171 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001172 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001173
1174
showard35162b02009-03-03 02:17:30 +00001175 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001176 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001177 _drone_manager.write_lines_to_file(error_file_path,
1178 [_LOST_PROCESS_ERROR])
1179
1180
showardd3dc1992009-04-22 21:01:40 +00001181 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001182 if not self.monitor:
1183 return
1184
showardd9205182009-04-27 20:09:55 +00001185 self._write_job_finished()
1186
showard35162b02009-03-03 02:17:30 +00001187 if self.monitor.lost_process:
1188 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001189
jadmanskif7fa2cc2008-10-01 14:13:23 +00001190
showardcbd74612008-11-19 21:42:02 +00001191 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001192 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001193 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001194 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001195 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001196
1197
jadmanskif7fa2cc2008-10-01 14:13:23 +00001198 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001199 if not self.monitor or not self.monitor.has_process():
1200 return
1201
jadmanskif7fa2cc2008-10-01 14:13:23 +00001202 # build up sets of all the aborted_by and aborted_on values
1203 aborted_by, aborted_on = set(), set()
1204 for queue_entry in self.queue_entries:
1205 if queue_entry.aborted_by:
1206 aborted_by.add(queue_entry.aborted_by)
1207 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1208 aborted_on.add(t)
1209
1210 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001211 # TODO(showard): this conditional is now obsolete, we just need to leave
1212 # it in temporarily for backwards compatibility over upgrades. delete
1213 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001214 assert len(aborted_by) <= 1
1215 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001216 aborted_by_value = aborted_by.pop()
1217 aborted_on_value = max(aborted_on)
1218 else:
1219 aborted_by_value = 'autotest_system'
1220 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001221
showarda0382352009-02-11 23:36:43 +00001222 self._write_keyval_after_job("aborted_by", aborted_by_value)
1223 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001224
showardcbd74612008-11-19 21:42:02 +00001225 aborted_on_string = str(datetime.datetime.fromtimestamp(
1226 aborted_on_value))
1227 self._write_status_comment('Job aborted by %s on %s' %
1228 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001229
1230
jadmanski0afbb632008-06-06 21:10:57 +00001231 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001232 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001233 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001234 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001235
1236
jadmanski0afbb632008-06-06 21:10:57 +00001237 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001238 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001239 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001240
1241
1242class QueueTask(AbstractQueueTask):
1243 def __init__(self, queue_entries):
1244 super(QueueTask, self).__init__(queue_entries)
1245 self._set_ids(queue_entries=queue_entries)
1246
1247
1248 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001249 self._check_queue_entry_statuses(
1250 self.queue_entries,
1251 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1252 models.HostQueueEntry.Status.RUNNING),
1253 allowed_host_statuses=(models.Host.Status.PENDING,
1254 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001255
1256 super(QueueTask, self).prolog()
1257
1258 for queue_entry in self.queue_entries:
1259 self._write_host_keyvals(queue_entry.host)
1260 queue_entry.host.set_status(models.Host.Status.RUNNING)
1261 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001262
1263
1264 def _finish_task(self):
1265 super(QueueTask, self)._finish_task()
1266
1267 for queue_entry in self.queue_entries:
1268 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001269 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001270
1271
Alex Miller9f01d5d2013-08-08 02:26:01 -07001272 def _command_line(self):
1273 invocation = super(QueueTask, self)._command_line()
1274 return invocation + ['--verify_job_repo_url']
1275
1276
Dan Shi1a189052013-10-28 14:41:35 -07001277class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001278 def __init__(self, queue_entry):
1279 super(HostlessQueueTask, self).__init__([queue_entry])
1280 self.queue_entry_ids = [queue_entry.id]
1281
1282
1283 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001284 super(HostlessQueueTask, self).prolog()
1285
1286
mbligh4608b002010-01-05 18:22:35 +00001287 def _finish_task(self):
1288 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001289
1290 # When a job is added to database, its initial status is always
1291 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1292 # status, check if any of them can be started. If scheduler hits some
Alex Millerac189f32014-06-23 13:55:23 -07001293 # limit, e.g., max_hostless_jobs_per_drone,
1294 # max_processes_started_per_cycle, scheduler will leave these jobs in
1295 # Starting status. Otherwise, the jobs' status will be changed to
1296 # Running, and an autoserv process will be started in drone for each of
1297 # these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001298 # If the entry is still in status Starting, the process has not started
1299 # yet. Therefore, there is no need to parse and collect log. Without
1300 # this check, exception will be raised by scheduler as execution_subdir
1301 # for this queue entry does not have a value yet.
1302 hqe = self.queue_entries[0]
1303 if hqe.status != models.HostQueueEntry.Status.STARTING:
1304 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001305
1306
mbligh36768f02008-02-22 18:28:33 +00001307if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001308 main()