blob: ab188820b8f5542a881fb498f45f6eeed192cff9 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
Dan Shif6c65bd2014-08-29 16:15:07 -07008import datetime
9import gc
10import logging
11import optparse
12import os
13import signal
14import sys
15import time
showard402934a2009-12-21 22:20:47 +000016
Alex Miller05d7b4c2013-03-04 07:49:38 -080017import common
showard21baa452008-10-21 00:08:39 +000018from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000019
20import django.db
21
Prashanth B0e960282014-05-13 19:38:28 -070022from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070023from autotest_lib.client.common_lib import utils
Michael Liangda8c60a2014-06-03 13:24:51 -070024from autotest_lib.client.common_lib.cros.graphite import stats
Prashanth B0e960282014-05-13 19:38:28 -070025from autotest_lib.frontend.afe import models, rpc_utils
Fang Dengc330bee2014-10-21 18:10:55 -070026from autotest_lib.scheduler import agent_task, drone_manager
beeps5e2bb4a2013-10-28 11:26:45 -070027from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
28from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070029from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070030from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070031from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000032from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080033from autotest_lib.scheduler import status_server, scheduler_config
Prashanth Bf66d51b2014-05-06 12:42:25 -070034from autotest_lib.scheduler import scheduler_lib
Aviv Keshet308e7362013-05-21 14:43:16 -070035from autotest_lib.server import autoserv_utils
Dan Shib9144a42014-12-01 16:09:32 -080036from autotest_lib.site_utils import server_manager_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080037
showard549afad2009-08-20 23:33:36 +000038BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
39PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000040
mbligh36768f02008-02-22 18:28:33 +000041RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000042AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
43
44if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000045 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000046AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
47AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
48
49if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000050 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000051
showard35162b02009-03-03 02:17:30 +000052# error message to leave in results dir when an autoserv process disappears
53# mysteriously
54_LOST_PROCESS_ERROR = """\
55Autoserv failed abnormally during execution for this job, probably due to a
56system error on the Autotest server. Full results may not be available. Sorry.
57"""
58
Prashanth B0e960282014-05-13 19:38:28 -070059_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070060_db = None
mbligh36768f02008-02-22 18:28:33 +000061_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070062
63# These 2 globals are replaced for testing
64_autoserv_directory = autoserv_utils.autoserv_directory
65_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000066_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000067_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070068_inline_host_acquisition = global_config.global_config.get_config_value(
69 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
70 default=True)
71
mbligh36768f02008-02-22 18:28:33 +000072
mbligh83c1e9e2009-05-01 23:10:41 +000073def _site_init_monitor_db_dummy():
74 return {}
75
76
jamesren76fcf192010-04-21 20:39:50 +000077def _verify_default_drone_set_exists():
78 if (models.DroneSet.drone_sets_enabled() and
79 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070080 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080081 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000082
83
84def _sanity_check():
85 """Make sure the configs are consistent before starting the scheduler"""
86 _verify_default_drone_set_exists()
87
88
mbligh36768f02008-02-22 18:28:33 +000089def main():
showard27f33872009-04-07 18:20:53 +000090 try:
showard549afad2009-08-20 23:33:36 +000091 try:
92 main_without_exception_handling()
93 except SystemExit:
94 raise
95 except:
96 logging.exception('Exception escaping in monitor_db')
97 raise
98 finally:
99 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000100
101
102def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700103 scheduler_lib.setup_logging(
104 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
105 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000106 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000107 parser = optparse.OptionParser(usage)
108 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
109 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000110 parser.add_option('--test', help='Indicate that scheduler is under ' +
111 'test and should use dummy autoserv and no parsing',
112 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700113 parser.add_option('--production',
114 help=('Indicate that scheduler is running in production '
115 'environment and it can use database that is not '
116 'hosted in localhost. If it is set to False, '
117 'scheduler will fail if database is not in '
118 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700119 action='store_true', default=False)
jadmanski0afbb632008-06-06 21:10:57 +0000120 (options, args) = parser.parse_args()
121 if len(args) != 1:
122 parser.print_usage()
123 return
mbligh36768f02008-02-22 18:28:33 +0000124
Dan Shif6c65bd2014-08-29 16:15:07 -0700125 scheduler_lib.check_production_settings(options)
126
showard5613c662009-06-08 23:30:33 +0000127 scheduler_enabled = global_config.global_config.get_config_value(
128 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
129
130 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800131 logging.error("Scheduler not enabled, set enable_scheduler to true in "
132 "the global_config's SCHEDULER section to enable it. "
133 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000134 sys.exit(1)
135
jadmanski0afbb632008-06-06 21:10:57 +0000136 global RESULTS_DIR
137 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000138
mbligh83c1e9e2009-05-01 23:10:41 +0000139 site_init = utils.import_site_function(__file__,
140 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
141 _site_init_monitor_db_dummy)
142 site_init()
143
showardcca334f2009-03-12 20:38:34 +0000144 # Change the cwd while running to avoid issues incase we were launched from
145 # somewhere odd (such as a random NFS home directory of the person running
146 # sudo to launch us as the appropriate user).
147 os.chdir(RESULTS_DIR)
148
jamesrenc7d387e2010-08-10 21:48:30 +0000149 # This is helpful for debugging why stuff a scheduler launches is
150 # misbehaving.
151 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000152
jadmanski0afbb632008-06-06 21:10:57 +0000153 if options.test:
154 global _autoserv_path
155 _autoserv_path = 'autoserv_dummy'
156 global _testing_mode
157 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000158
jamesrenc44ae992010-02-19 00:12:54 +0000159 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000160 server.start()
161
jadmanski0afbb632008-06-06 21:10:57 +0000162 try:
jamesrenc44ae992010-02-19 00:12:54 +0000163 initialize()
showardc5afc462009-01-13 00:09:39 +0000164 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000165 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000166
Eric Lia82dc352011-02-23 13:15:52 -0800167 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000168 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000169 time.sleep(scheduler_config.config.tick_pause_sec)
Prashanth B4ec98672014-05-15 10:44:54 -0700170 except Exception:
showard170873e2009-01-07 00:22:26 +0000171 email_manager.manager.log_stacktrace(
172 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000173
showard170873e2009-01-07 00:22:26 +0000174 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000175 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000176 _drone_manager.shutdown()
Prashanth B0e960282014-05-13 19:38:28 -0700177 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000178
179
Prashanth B4ec98672014-05-15 10:44:54 -0700180def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000181 global _shutdown
182 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000183 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000184
185
jamesrenc44ae992010-02-19 00:12:54 +0000186def initialize():
showardb18134f2009-03-20 20:52:18 +0000187 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
188 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000189
showard8de37132009-08-31 18:33:08 +0000190 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000191 logging.critical("monitor_db already running, aborting!")
192 sys.exit(1)
193 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000194
showardb1e51872008-10-07 11:08:18 +0000195 if _testing_mode:
196 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700197 scheduler_lib.DB_CONFIG_SECTION, 'database',
198 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000199
Dan Shib9144a42014-12-01 16:09:32 -0800200 # If server database is enabled, check if the server has role `scheduler`.
201 # If the server does not have scheduler role, exception will be raised and
202 # scheduler will not continue to run.
203 if server_manager_utils.use_server_db():
204 server_manager_utils.confirm_server_has_role(hostname='localhost',
205 role='scheduler')
206
jadmanski0afbb632008-06-06 21:10:57 +0000207 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700208 global _db_manager
209 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700210 global _db
211 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000212 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700213 signal.signal(signal.SIGINT, handle_signal)
214 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000215
jamesrenc44ae992010-02-19 00:12:54 +0000216 initialize_globals()
217 scheduler_models.initialize()
218
Dan Shib9144a42014-12-01 16:09:32 -0800219 if server_manager_utils.use_server_db():
220 drone_list = server_manager_utils.get_drones()
221 else:
222 drones = global_config.global_config.get_config_value(
223 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
224 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000225 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000226 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000227 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
228
showardb18134f2009-03-20 20:52:18 +0000229 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000230
231
jamesrenc44ae992010-02-19 00:12:54 +0000232def initialize_globals():
233 global _drone_manager
234 _drone_manager = drone_manager.instance()
235
236
showarded2afea2009-07-07 20:54:07 +0000237def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
238 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000239 """
240 @returns The autoserv command line as a list of executable + parameters.
241
242 @param machines - string - A machine or comma separated list of machines
243 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000244 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700245 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
246 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000247 @param queue_entry - A HostQueueEntry object - If supplied and no Job
248 object was supplied, this will be used to lookup the Job object.
249 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700250 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
251 machines, results_directory=drone_manager.WORKING_DIRECTORY,
252 extra_args=extra_args, job=job, queue_entry=queue_entry,
253 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000254
255
Simran Basia858a232012-08-21 11:04:37 -0700256class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800257
258
jadmanski0afbb632008-06-06 21:10:57 +0000259 def __init__(self):
260 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000261 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700262 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000263 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700264 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700265 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Jakob Jülich36accc62014-07-23 10:26:55 -0700266 _db)
showard170873e2009-01-07 00:22:26 +0000267 self._host_agents = {}
268 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000269 self._tick_count = 0
270 self._last_garbage_stats_time = time.time()
271 self._seconds_between_garbage_stats = 60 * (
272 global_config.global_config.get_config_value(
273 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700274 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700275 self._tick_debug = global_config.global_config.get_config_value(
276 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
277 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700278 self._extra_debugging = global_config.global_config.get_config_value(
279 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
280 default=False)
mbligh36768f02008-02-22 18:28:33 +0000281
Prashanth Bf66d51b2014-05-06 12:42:25 -0700282 # If _inline_host_acquisition is set the scheduler will acquire and
283 # release hosts against jobs inline, with the tick. Otherwise the
284 # scheduler will only focus on jobs that already have hosts, and
285 # will not explicitly unlease a host when a job finishes using it.
286 self._job_query_manager = query_managers.AFEJobQueryManager()
287 self._host_scheduler = (host_scheduler.BaseHostScheduler()
288 if _inline_host_acquisition else
289 host_scheduler.DummyHostScheduler())
290
mbligh36768f02008-02-22 18:28:33 +0000291
showard915958d2009-04-22 21:00:58 +0000292 def initialize(self, recover_hosts=True):
293 self._periodic_cleanup.initialize()
294 self._24hr_upkeep.initialize()
295
jadmanski0afbb632008-06-06 21:10:57 +0000296 # always recover processes
297 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000298
jadmanski0afbb632008-06-06 21:10:57 +0000299 if recover_hosts:
300 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000301
302
Simran Basi0ec94dd2012-08-28 09:50:10 -0700303 def _log_tick_msg(self, msg):
304 if self._tick_debug:
305 logging.debug(msg)
306
307
Simran Basidef92872012-09-20 13:34:34 -0700308 def _log_extra_msg(self, msg):
309 if self._extra_debugging:
310 logging.debug(msg)
311
312
jadmanski0afbb632008-06-06 21:10:57 +0000313 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700314 """
315 This is an altered version of tick() where we keep track of when each
316 major step begins so we can try to figure out where we are using most
317 of the tick time.
318 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700319 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700320 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000321 self._garbage_collection()
Prashanth B340fd1e2014-06-22 12:44:10 -0700322 self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
323 _drone_manager.trigger_refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700324 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000325 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700326 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000327 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700328 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000329 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700330 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000331 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700332 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000333 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700334 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000335 self._schedule_new_jobs()
Prashanth B340fd1e2014-06-22 12:44:10 -0700336 self._log_tick_msg('Calling _drone_manager.sync_refresh().')
337 _drone_manager.sync_refresh()
Prashanth B67548092014-07-11 18:46:01 -0700338 self._log_tick_msg('Calling _find_aborting().')
339 self._find_aborting()
340 self._log_tick_msg('Calling _find_aborted_special_tasks().')
341 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700342 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000343 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700344 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000345 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700346 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000347 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700348 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700349 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700350 with timer.get_client('email_manager_send_queued_emails'):
351 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700352 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700353 with timer.get_client('django_db_reset_queries'):
354 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000355 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000356
showard97aed502008-11-04 02:01:24 +0000357
mblighf3294cc2009-04-08 21:17:38 +0000358 def _run_cleanup(self):
359 self._periodic_cleanup.run_cleanup_maybe()
360 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000361
mbligh36768f02008-02-22 18:28:33 +0000362
showardf13a9e22009-12-18 22:54:09 +0000363 def _garbage_collection(self):
364 threshold_time = time.time() - self._seconds_between_garbage_stats
365 if threshold_time < self._last_garbage_stats_time:
366 # Don't generate these reports very often.
367 return
368
369 self._last_garbage_stats_time = time.time()
370 # Force a full level 0 collection (because we can, it doesn't hurt
371 # at this interval).
372 gc.collect()
373 logging.info('Logging garbage collector stats on tick %d.',
374 self._tick_count)
375 gc_stats._log_garbage_collector_stats()
376
377
showard170873e2009-01-07 00:22:26 +0000378 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
379 for object_id in object_ids:
380 agent_dict.setdefault(object_id, set()).add(agent)
381
382
383 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
384 for object_id in object_ids:
385 assert object_id in agent_dict
386 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700387 # If an ID has no more active agent associated, there is no need to
388 # keep it in the dictionary. Otherwise, scheduler will keep an
389 # unnecessarily big dictionary until being restarted.
390 if not agent_dict[object_id]:
391 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000392
393
showardd1195652009-12-08 22:21:02 +0000394 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700395 """
396 Creates and adds an agent to the dispatchers list.
397
398 In creating the agent we also pass on all the queue_entry_ids and
399 host_ids from the special agent task. For every agent we create, we
400 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
401 against the host_ids given to it. So theoritically, a host can have any
402 number of agents associated with it, and each of them can have any
403 special agent task, though in practice we never see > 1 agent/task per
404 host at any time.
405
406 @param agent_task: A SpecialTask for the agent to manage.
407 """
showardd1195652009-12-08 22:21:02 +0000408 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000409 self._agents.append(agent)
410 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000411 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
412 self._register_agent_for_ids(self._queue_entry_agents,
413 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000414
showard170873e2009-01-07 00:22:26 +0000415
416 def get_agents_for_entry(self, queue_entry):
417 """
418 Find agents corresponding to the specified queue_entry.
419 """
showardd3dc1992009-04-22 21:01:40 +0000420 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000421
422
423 def host_has_agent(self, host):
424 """
425 Determine if there is currently an Agent present using this host.
426 """
427 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000428
429
jadmanski0afbb632008-06-06 21:10:57 +0000430 def remove_agent(self, agent):
431 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000432 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
433 agent)
434 self._unregister_agent_for_ids(self._queue_entry_agents,
435 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000436
437
showard8cc058f2009-09-08 16:26:33 +0000438 def _host_has_scheduled_special_task(self, host):
439 return bool(models.SpecialTask.objects.filter(host__id=host.id,
440 is_active=False,
441 is_complete=False))
442
443
jadmanski0afbb632008-06-06 21:10:57 +0000444 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000445 agent_tasks = self._create_recovery_agent_tasks()
446 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000447 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000448 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000449 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000450 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000451 self._reverify_remaining_hosts()
452 # reinitialize drones after killing orphaned processes, since they can
453 # leave around files when they die
454 _drone_manager.execute_actions()
455 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000456
showard170873e2009-01-07 00:22:26 +0000457
showardd1195652009-12-08 22:21:02 +0000458 def _create_recovery_agent_tasks(self):
459 return (self._get_queue_entry_agent_tasks()
460 + self._get_special_task_agent_tasks(is_active=True))
461
462
463 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700464 """
465 Get agent tasks for all hqe in the specified states.
466
467 Loosely this translates to taking a hqe in one of the specified states,
468 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
469 through _get_agent_task_for_queue_entry. Each queue entry can only have
470 one agent task at a time, but there might be multiple queue entries in
471 the group.
472
473 @return: A list of AgentTasks.
474 """
showardd1195652009-12-08 22:21:02 +0000475 # host queue entry statuses handled directly by AgentTasks (Verifying is
476 # handled through SpecialTasks, so is not listed here)
477 statuses = (models.HostQueueEntry.Status.STARTING,
478 models.HostQueueEntry.Status.RUNNING,
479 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000480 models.HostQueueEntry.Status.PARSING,
481 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000482 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000483 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000484 where='status IN (%s)' % status_list)
Alex Miller47cd2472013-11-25 15:20:04 -0800485 stats.Gauge('scheduler.jobs_per_tick').send(
486 'running', len(queue_entries))
showardd1195652009-12-08 22:21:02 +0000487
488 agent_tasks = []
489 used_queue_entries = set()
490 for entry in queue_entries:
491 if self.get_agents_for_entry(entry):
492 # already being handled
493 continue
494 if entry in used_queue_entries:
495 # already picked up by a synchronous job
496 continue
497 agent_task = self._get_agent_task_for_queue_entry(entry)
498 agent_tasks.append(agent_task)
499 used_queue_entries.update(agent_task.queue_entries)
500 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000501
502
showardd1195652009-12-08 22:21:02 +0000503 def _get_special_task_agent_tasks(self, is_active=False):
504 special_tasks = models.SpecialTask.objects.filter(
505 is_active=is_active, is_complete=False)
506 return [self._get_agent_task_for_special_task(task)
507 for task in special_tasks]
508
509
510 def _get_agent_task_for_queue_entry(self, queue_entry):
511 """
beeps8bb1f7d2013-08-05 01:30:09 -0700512 Construct an AgentTask instance for the given active HostQueueEntry.
513
showardd1195652009-12-08 22:21:02 +0000514 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700515 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000516 """
517 task_entries = queue_entry.job.get_group_entries(queue_entry)
518 self._check_for_duplicate_host_entries(task_entries)
519
520 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
521 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000522 if queue_entry.is_hostless():
523 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000524 return QueueTask(queue_entries=task_entries)
525 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700526 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000527 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700528 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000529 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700530 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000531
Prashanth B0e960282014-05-13 19:38:28 -0700532 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800533 '_get_agent_task_for_queue_entry got entry with '
534 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000535
536
537 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000538 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
539 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000540 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000541 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000542 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000543 if using_host:
showardd1195652009-12-08 22:21:02 +0000544 self._assert_host_has_no_agent(task_entry)
545
546
547 def _assert_host_has_no_agent(self, entry):
548 """
549 @param entry: a HostQueueEntry or a SpecialTask
550 """
551 if self.host_has_agent(entry.host):
552 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700553 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000554 'While scheduling %s, host %s already has a host agent %s'
555 % (entry, entry.host, agent.task))
556
557
558 def _get_agent_task_for_special_task(self, special_task):
559 """
560 Construct an AgentTask class to run the given SpecialTask and add it
561 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700562
MK Ryu35d661e2014-09-25 17:44:10 -0700563 A special task is created through schedule_special_tasks, but only if
beeps8bb1f7d2013-08-05 01:30:09 -0700564 the host doesn't already have an agent. This happens through
565 add_agent_task. All special agent tasks are given a host on creation,
566 and a Null hqe. To create a SpecialAgentTask object, you need a
567 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
568 object contains a hqe it's passed on to the special agent task, which
569 creates a HostQueueEntry and saves it as it's queue_entry.
570
showardd1195652009-12-08 22:21:02 +0000571 @param special_task: a models.SpecialTask instance
572 @returns an AgentTask to run this SpecialTask
573 """
574 self._assert_host_has_no_agent(special_task)
575
beeps5e2bb4a2013-10-28 11:26:45 -0700576 special_agent_task_classes = (prejob_task.CleanupTask,
577 prejob_task.VerifyTask,
578 prejob_task.RepairTask,
579 prejob_task.ResetTask,
580 prejob_task.ProvisionTask)
581
showardd1195652009-12-08 22:21:02 +0000582 for agent_task_class in special_agent_task_classes:
583 if agent_task_class.TASK_TYPE == special_task.task:
584 return agent_task_class(task=special_task)
585
Prashanth B0e960282014-05-13 19:38:28 -0700586 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800587 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000588
589
590 def _register_pidfiles(self, agent_tasks):
591 for agent_task in agent_tasks:
592 agent_task.register_necessary_pidfiles()
593
594
595 def _recover_tasks(self, agent_tasks):
596 orphans = _drone_manager.get_orphaned_autoserv_processes()
597
598 for agent_task in agent_tasks:
599 agent_task.recover()
600 if agent_task.monitor and agent_task.monitor.has_process():
601 orphans.discard(agent_task.monitor.get_process())
602 self.add_agent_task(agent_task)
603
604 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000605
606
showard8cc058f2009-09-08 16:26:33 +0000607 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000608 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
609 % status):
showard0db3d432009-10-12 20:29:15 +0000610 if entry.status == status and not self.get_agents_for_entry(entry):
611 # The status can change during iteration, e.g., if job.run()
612 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000613 yield entry
614
615
showard6878e8b2009-07-20 22:37:45 +0000616 def _check_for_remaining_orphan_processes(self, orphans):
617 if not orphans:
618 return
619 subject = 'Unrecovered orphan autoserv processes remain'
620 message = '\n'.join(str(process) for process in orphans)
621 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000622
623 die_on_orphans = global_config.global_config.get_config_value(
624 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
625
626 if die_on_orphans:
627 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000628
showard170873e2009-01-07 00:22:26 +0000629
showard8cc058f2009-09-08 16:26:33 +0000630 def _recover_pending_entries(self):
631 for entry in self._get_unassigned_entries(
632 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000633 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000634 entry.on_pending()
635
636
showardb8900452009-10-12 20:31:01 +0000637 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000638 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000639 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
640 unrecovered_hqes = []
641 for queue_entry in queue_entries:
642 special_tasks = models.SpecialTask.objects.filter(
643 task__in=(models.SpecialTask.Task.CLEANUP,
644 models.SpecialTask.Task.VERIFY),
645 queue_entry__id=queue_entry.id,
646 is_complete=False)
647 if special_tasks.count() == 0:
648 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000649
showardb8900452009-10-12 20:31:01 +0000650 if unrecovered_hqes:
651 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700652 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000653 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000654 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000655
656
showard65db3932009-10-28 19:54:35 +0000657 def _schedule_special_tasks(self):
658 """
659 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700660
661 Special tasks include PreJobTasks like verify, reset and cleanup.
662 They are created through _schedule_new_jobs and associated with a hqe
663 This method translates SpecialTasks to the appropriate AgentTask and
664 adds them to the dispatchers agents list, so _handle_agents can execute
665 them.
showard65db3932009-10-28 19:54:35 +0000666 """
Prashanth B4ec98672014-05-15 10:44:54 -0700667 # When the host scheduler is responsible for acquisition we only want
668 # to run tasks with leased hosts. All hqe tasks will already have
669 # leased hosts, and we don't want to run frontend tasks till the host
670 # scheduler has vetted the assignment. Note that this doesn't include
671 # frontend tasks with hosts leased by other active hqes.
672 for task in self._job_query_manager.get_prioritized_special_tasks(
673 only_tasks_with_leased_hosts=not _inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000674 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000675 continue
showardd1195652009-12-08 22:21:02 +0000676 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000677
678
showard170873e2009-01-07 00:22:26 +0000679 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000680 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000681 # should never happen
showarded2afea2009-07-07 20:54:07 +0000682 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000683 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000684 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700685 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000686 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000687
688
jadmanski0afbb632008-06-06 21:10:57 +0000689 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000690 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700691 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000692 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000693 if self.host_has_agent(host):
694 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000695 continue
showard8cc058f2009-09-08 16:26:33 +0000696 if self._host_has_scheduled_special_task(host):
697 # host will have a special task scheduled on the next cycle
698 continue
showard170873e2009-01-07 00:22:26 +0000699 if print_message:
showardb18134f2009-03-20 20:52:18 +0000700 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000701 models.SpecialTask.objects.create(
702 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000703 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000704
705
jadmanski0afbb632008-06-06 21:10:57 +0000706 def _recover_hosts(self):
707 # recover "Repair Failed" hosts
708 message = 'Reverifying dead host %s'
709 self._reverify_hosts_where("status = 'Repair Failed'",
710 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000711
712
showard89f84db2009-03-12 20:39:13 +0000713 def _refresh_pending_queue_entries(self):
714 """
715 Lookup the pending HostQueueEntries and call our HostScheduler
716 refresh() method given that list. Return the list.
717
718 @returns A list of pending HostQueueEntries sorted in priority order.
719 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700720 queue_entries = self._job_query_manager.get_pending_queue_entries(
721 only_hostless=not _inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000722 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000723 return []
showard89f84db2009-03-12 20:39:13 +0000724 return queue_entries
725
726
showarda9545c02009-12-18 22:44:26 +0000727 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800728 """Schedule a hostless (suite) job.
729
730 @param queue_entry: The queue_entry representing the hostless job.
731 """
showarda9545c02009-12-18 22:44:26 +0000732 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700733
734 # Need to set execution_subdir before setting the status:
735 # After a restart of the scheduler, agents will be restored for HQEs in
736 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
737 # execution_subdir is needed. Therefore it must be set before entering
738 # one of these states.
739 # Otherwise, if the scheduler was interrupted between setting the status
740 # and the execution_subdir, upon it's restart restoring agents would
741 # fail.
742 # Is there a way to get a status in one of these states without going
743 # through this code? Following cases are possible:
744 # - If it's aborted before being started:
745 # active bit will be 0, so there's nothing to parse, it will just be
746 # set to completed by _find_aborting. Critical statuses are skipped.
747 # - If it's aborted or it fails after being started:
748 # It was started, so this code was executed.
749 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000750 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000751
752
beepscc9fc702013-12-02 12:45:38 -0800753 def _schedule_host_job(self, host, queue_entry):
754 """Schedules a job on the given host.
755
756 1. Assign the host to the hqe, if it isn't already assigned.
757 2. Create a SpecialAgentTask for the hqe.
758 3. Activate the hqe.
759
760 @param queue_entry: The job to schedule.
761 @param host: The host to schedule the job on.
762 """
763 if self.host_has_agent(host):
764 host_agent_task = list(self._host_agents.get(host.id))[0].task
765 subject = 'Host with agents assigned to an HQE'
766 message = ('HQE: %s assigned host %s, but the host has '
767 'agent: %s for queue_entry %s. The HQE '
beepsf7d35162014-02-13 16:42:13 -0800768 'will have to try and acquire a host next tick ' %
beepscc9fc702013-12-02 12:45:38 -0800769 (queue_entry, host.hostname, host_agent_task,
770 host_agent_task.queue_entry))
771 email_manager.manager.enqueue_notify_email(subject, message)
beepscc9fc702013-12-02 12:45:38 -0800772 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700773 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800774
775
showard89f84db2009-03-12 20:39:13 +0000776 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700777 """
778 Find any new HQEs and call schedule_pre_job_tasks for it.
779
780 This involves setting the status of the HQE and creating a row in the
781 db corresponding the the special task, through
782 scheduler_models._queue_special_task. The new db row is then added as
783 an agent to the dispatcher through _schedule_special_tasks and
784 scheduled for execution on the drone through _handle_agents.
785 """
showard89f84db2009-03-12 20:39:13 +0000786 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000787
beepscc9fc702013-12-02 12:45:38 -0800788 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700789 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700790 new_jobs_with_hosts = 0
791 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800792 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700793 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000794
beepscc9fc702013-12-02 12:45:38 -0800795 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000796 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000797 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700798 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000799 else:
beepscc9fc702013-12-02 12:45:38 -0800800 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700801 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700802
beepsb255fc52013-10-13 23:28:54 -0700803 stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800804 if not host_jobs:
805 return
Prashanth Bf66d51b2014-05-06 12:42:25 -0700806 if not _inline_host_acquisition:
807 message = ('Found %s jobs that need hosts though '
808 '_inline_host_acquisition=%s. Will acquire hosts.' %
809 ([str(job) for job in host_jobs],
810 _inline_host_acquisition))
811 email_manager.manager.enqueue_notify_email(
812 'Processing unexpected host acquisition requests', message)
813 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
814 for host_assignment in jobs_with_hosts:
815 self._schedule_host_job(host_assignment.host, host_assignment.job)
816 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800817
beepsb255fc52013-10-13 23:28:54 -0700818 stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
819 stats.Gauge(key).send('new_jobs_without_hosts',
820 new_jobs_need_hosts - new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000821
822
showard8cc058f2009-09-08 16:26:33 +0000823 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700824 """
825 Adds agents to the dispatcher.
826
827 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
828 QueueTask for example, will have a job with a control file, and
829 the agent will have methods that poll, abort and check if the queue
830 task is finished. The dispatcher runs the agent_task, as well as
831 other agents in it's _agents member, through _handle_agents, by
832 calling the Agents tick().
833
834 This method creates an agent for each HQE in one of (starting, running,
835 gathering, parsing, archiving) states, and adds it to the dispatcher so
836 it is handled by _handle_agents.
837 """
showardd1195652009-12-08 22:21:02 +0000838 for agent_task in self._get_queue_entry_agent_tasks():
839 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000840
841
842 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000843 for entry in scheduler_models.HostQueueEntry.fetch(
844 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000845 task = entry.job.schedule_delayed_callback_task(entry)
846 if task:
showardd1195652009-12-08 22:21:02 +0000847 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000848
849
jadmanski0afbb632008-06-06 21:10:57 +0000850 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700851 """
852 Looks through the afe_host_queue_entries for an aborted entry.
853
854 The aborted bit is set on an HQE in many ways, the most common
855 being when a user requests an abort through the frontend, which
856 results in an rpc from the afe to abort_host_queue_entries.
857 """
jamesrene7c65cb2010-06-08 20:38:10 +0000858 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000859 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700860 where='aborted=1 and complete=0'):
showardf4a2e502009-07-28 20:06:39 +0000861 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800862
863 # The task would have started off with both is_complete and
864 # is_active = False. Aborted tasks are neither active nor complete.
865 # For all currently active tasks this will happen through the agent,
866 # but we need to manually update the special tasks that haven't
867 # started yet, because they don't have agents.
868 models.SpecialTask.objects.filter(is_active=False,
869 queue_entry_id=entry.id).update(is_complete=True)
870
showardd3dc1992009-04-22 21:01:40 +0000871 for agent in self.get_agents_for_entry(entry):
872 agent.abort()
873 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000874 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700875 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000876 for job in jobs_to_stop:
877 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000878
879
beeps8bb1f7d2013-08-05 01:30:09 -0700880 def _find_aborted_special_tasks(self):
881 """
882 Find SpecialTasks that have been marked for abortion.
883
884 Poll the database looking for SpecialTasks that are active
885 and have been marked for abortion, then abort them.
886 """
887
888 # The completed and active bits are very important when it comes
889 # to scheduler correctness. The active bit is set through the prolog
890 # of a special task, and reset through the cleanup method of the
891 # SpecialAgentTask. The cleanup is called both through the abort and
892 # epilog. The complete bit is set in several places, and in general
893 # a hanging job will have is_active=1 is_complete=0, while a special
894 # task which completed will have is_active=0 is_complete=1. To check
895 # aborts we directly check active because the complete bit is set in
896 # several places, including the epilog of agent tasks.
897 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
898 is_aborted=True)
899 for task in aborted_tasks:
900 # There are 2 ways to get the agent associated with a task,
901 # through the host and through the hqe. A special task
902 # always needs a host, but doesn't always need a hqe.
903 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700904 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000905
beeps8bb1f7d2013-08-05 01:30:09 -0700906 # The epilog preforms critical actions such as
907 # queueing the next SpecialTask, requeuing the
908 # hqe etc, however it doesn't actually kill the
909 # monitor process and set the 'done' bit. Epilogs
910 # assume that the job failed, and that the monitor
911 # process has already written an exit code. The
912 # done bit is a necessary condition for
913 # _handle_agents to schedule any more special
914 # tasks against the host, and it must be set
915 # in addition to is_active, is_complete and success.
916 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000917 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700918
919
showard324bf812009-01-20 23:23:38 +0000920 def _can_start_agent(self, agent, num_started_this_cycle,
921 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000922 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000923 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000924 return True
925 # don't allow any nonzero-process agents to run after we've reached a
926 # limit (this avoids starvation of many-process agents)
927 if have_reached_limit:
928 return False
929 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000930 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000931 agent.task.owner_username,
932 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000933 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000934 return False
935 # if a single agent exceeds the per-cycle throttling, still allow it to
936 # run when it's the first agent in the cycle
937 if num_started_this_cycle == 0:
938 return True
939 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000940 if (num_started_this_cycle + agent.task.num_processes >
941 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000942 return False
943 return True
944
945
jadmanski0afbb632008-06-06 21:10:57 +0000946 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700947 """
948 Handles agents of the dispatcher.
949
950 Appropriate Agents are added to the dispatcher through
951 _schedule_running_host_queue_entries. These agents each
952 have a task. This method runs the agents task through
953 agent.tick() leading to:
954 agent.start
955 prolog -> AgentTasks prolog
956 For each queue entry:
957 sets host status/status to Running
958 set started_on in afe_host_queue_entries
959 run -> AgentTasks run
960 Creates PidfileRunMonitor
961 Queues the autoserv command line for this AgentTask
962 via the drone manager. These commands are executed
963 through the drone managers execute actions.
964 poll -> AgentTasks/BaseAgentTask poll
965 checks the monitors exit_code.
966 Executes epilog if task is finished.
967 Executes AgentTasks _finish_task
968 finish_task is usually responsible for setting the status
969 of the HQE/host, and updating it's active and complete fileds.
970
971 agent.is_done
972 Removed the agent from the dispatchers _agents queue.
973 Is_done checks the finished bit on the agent, that is
974 set based on the Agents task. During the agents poll
975 we check to see if the monitor process has exited in
976 it's finish method, and set the success member of the
977 task based on this exit code.
978 """
jadmanski0afbb632008-06-06 21:10:57 +0000979 num_started_this_cycle = 0
Prashanth Balasubramanian628bfcf2014-10-02 12:44:13 -0700980 num_finished_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000981 have_reached_limit = False
982 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700983 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000984 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700985 self._log_extra_msg('Processing Agent with Host Ids: %s and '
986 'queue_entry ids:%s' % (agent.host_ids,
987 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000988 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000989 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000990 have_reached_limit):
991 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700992 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000993 continue
showardd1195652009-12-08 22:21:02 +0000994 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700995 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000996 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700997 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000998 if agent.is_done():
Prashanth Balasubramanian628bfcf2014-10-02 12:44:13 -0700999 num_finished_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001000 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001001 self.remove_agent(agent)
Prashanth Balasubramanian628bfcf2014-10-02 12:44:13 -07001002 stats.Gauge('scheduler.jobs_per_tick').send(
1003 'agents_started', num_started_this_cycle)
1004 stats.Gauge('scheduler.jobs_per_tick').send(
1005 'agents_finished', num_finished_this_cycle)
Simran Basi3f6717d2012-09-13 15:21:22 -07001006 logging.info('%d running processes. %d added this cycle.',
1007 _drone_manager.total_running_processes(),
1008 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +00001009
1010
showard29f7cd22009-04-29 21:16:24 +00001011 def _process_recurring_runs(self):
1012 recurring_runs = models.RecurringRun.objects.filter(
1013 start_date__lte=datetime.datetime.now())
1014 for rrun in recurring_runs:
1015 # Create job from template
1016 job = rrun.job
1017 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001018 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001019
1020 host_objects = info['hosts']
1021 one_time_hosts = info['one_time_hosts']
1022 metahost_objects = info['meta_hosts']
1023 dependencies = info['dependencies']
1024 atomic_group = info['atomic_group']
1025
1026 for host in one_time_hosts or []:
1027 this_host = models.Host.create_one_time_host(host.hostname)
1028 host_objects.append(this_host)
1029
1030 try:
1031 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001032 options=options,
showard29f7cd22009-04-29 21:16:24 +00001033 host_objects=host_objects,
1034 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001035 atomic_group=atomic_group)
1036
1037 except Exception, ex:
1038 logging.exception(ex)
1039 #TODO send email
1040
1041 if rrun.loop_count == 1:
1042 rrun.delete()
1043 else:
1044 if rrun.loop_count != 0: # if not infinite loop
1045 # calculate new start_date
1046 difference = datetime.timedelta(seconds=rrun.loop_period)
1047 rrun.start_date = rrun.start_date + difference
1048 rrun.loop_count -= 1
1049 rrun.save()
1050
1051
Simran Basia858a232012-08-21 11:04:37 -07001052SiteDispatcher = utils.import_site_class(
1053 __file__, 'autotest_lib.scheduler.site_monitor_db',
1054 'SiteDispatcher', BaseDispatcher)
1055
1056class Dispatcher(SiteDispatcher):
1057 pass
1058
1059
mbligh36768f02008-02-22 18:28:33 +00001060class Agent(object):
showard77182562009-06-10 00:16:05 +00001061 """
Alex Miller47715eb2013-07-24 03:34:01 -07001062 An agent for use by the Dispatcher class to perform a task. An agent wraps
1063 around an AgentTask mainly to associate the AgentTask with the queue_entry
1064 and host ids.
showard77182562009-06-10 00:16:05 +00001065
1066 The following methods are required on all task objects:
1067 poll() - Called periodically to let the task check its status and
1068 update its internal state. If the task succeeded.
1069 is_done() - Returns True if the task is finished.
1070 abort() - Called when an abort has been requested. The task must
1071 set its aborted attribute to True if it actually aborted.
1072
1073 The following attributes are required on all task objects:
1074 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001075 success - bool, True if this task succeeded.
1076 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1077 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001078 """
1079
1080
showard418785b2009-11-23 20:19:59 +00001081 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001082 """
Alex Miller47715eb2013-07-24 03:34:01 -07001083 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001084 """
showard8cc058f2009-09-08 16:26:33 +00001085 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001086
showard77182562009-06-10 00:16:05 +00001087 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001088 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001089
showard8cc058f2009-09-08 16:26:33 +00001090 self.queue_entry_ids = task.queue_entry_ids
1091 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001092
showard8cc058f2009-09-08 16:26:33 +00001093 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001094 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001095
1096
jadmanski0afbb632008-06-06 21:10:57 +00001097 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001098 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001099 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001100 self.task.poll()
1101 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001102 self.finished = True
showardec113162008-05-08 00:52:49 +00001103
1104
jadmanski0afbb632008-06-06 21:10:57 +00001105 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001106 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001107
1108
showardd3dc1992009-04-22 21:01:40 +00001109 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001110 if self.task:
1111 self.task.abort()
1112 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001113 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001114 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001115
showardd3dc1992009-04-22 21:01:40 +00001116
beeps5e2bb4a2013-10-28 11:26:45 -07001117class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001118 """
1119 Common functionality for QueueTask and HostlessQueueTask
1120 """
1121 def __init__(self, queue_entries):
1122 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001123 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001124 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001125
1126
showard73ec0442009-02-07 02:05:20 +00001127 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001128 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001129
1130
jamesrenc44ae992010-02-19 00:12:54 +00001131 def _write_control_file(self, execution_path):
1132 control_path = _drone_manager.attach_file_to_execution(
1133 execution_path, self.job.control_file)
1134 return control_path
1135
1136
Aviv Keshet308e7362013-05-21 14:43:16 -07001137 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001138 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001139 execution_path = self.queue_entries[0].execution_path()
1140 control_path = self._write_control_file(execution_path)
1141 hostnames = ','.join(entry.host.hostname
1142 for entry in self.queue_entries
1143 if not entry.is_hostless())
1144
1145 execution_tag = self.queue_entries[0].execution_tag()
1146 params = _autoserv_command_line(
1147 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001148 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001149 _drone_manager.absolute_path(control_path)],
1150 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001151 if self.job.is_image_update_job():
1152 params += ['--image', self.job.update_image_path]
1153
jamesrenc44ae992010-02-19 00:12:54 +00001154 return params
showardd1195652009-12-08 22:21:02 +00001155
1156
1157 @property
1158 def num_processes(self):
1159 return len(self.queue_entries)
1160
1161
1162 @property
1163 def owner_username(self):
1164 return self.job.owner
1165
1166
1167 def _working_directory(self):
1168 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001169
1170
jadmanski0afbb632008-06-06 21:10:57 +00001171 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001172 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001173 keyval_dict = self.job.keyval_dict()
1174 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001175 group_name = self.queue_entries[0].get_group_name()
1176 if group_name:
1177 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001178 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001179 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001180 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001181 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001182
1183
showard35162b02009-03-03 02:17:30 +00001184 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001185 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001186 _drone_manager.write_lines_to_file(error_file_path,
1187 [_LOST_PROCESS_ERROR])
1188
1189
showardd3dc1992009-04-22 21:01:40 +00001190 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001191 if not self.monitor:
1192 return
1193
showardd9205182009-04-27 20:09:55 +00001194 self._write_job_finished()
1195
showard35162b02009-03-03 02:17:30 +00001196 if self.monitor.lost_process:
1197 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001198
jadmanskif7fa2cc2008-10-01 14:13:23 +00001199
showardcbd74612008-11-19 21:42:02 +00001200 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001201 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001202 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001203 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001204 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001205
1206
jadmanskif7fa2cc2008-10-01 14:13:23 +00001207 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001208 if not self.monitor or not self.monitor.has_process():
1209 return
1210
jadmanskif7fa2cc2008-10-01 14:13:23 +00001211 # build up sets of all the aborted_by and aborted_on values
1212 aborted_by, aborted_on = set(), set()
1213 for queue_entry in self.queue_entries:
1214 if queue_entry.aborted_by:
1215 aborted_by.add(queue_entry.aborted_by)
1216 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1217 aborted_on.add(t)
1218
1219 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001220 # TODO(showard): this conditional is now obsolete, we just need to leave
1221 # it in temporarily for backwards compatibility over upgrades. delete
1222 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001223 assert len(aborted_by) <= 1
1224 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001225 aborted_by_value = aborted_by.pop()
1226 aborted_on_value = max(aborted_on)
1227 else:
1228 aborted_by_value = 'autotest_system'
1229 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001230
showarda0382352009-02-11 23:36:43 +00001231 self._write_keyval_after_job("aborted_by", aborted_by_value)
1232 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001233
showardcbd74612008-11-19 21:42:02 +00001234 aborted_on_string = str(datetime.datetime.fromtimestamp(
1235 aborted_on_value))
1236 self._write_status_comment('Job aborted by %s on %s' %
1237 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001238
1239
jadmanski0afbb632008-06-06 21:10:57 +00001240 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001241 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001242 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001243 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001244
1245
jadmanski0afbb632008-06-06 21:10:57 +00001246 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001247 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001248 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001249
1250
1251class QueueTask(AbstractQueueTask):
1252 def __init__(self, queue_entries):
1253 super(QueueTask, self).__init__(queue_entries)
1254 self._set_ids(queue_entries=queue_entries)
1255
1256
1257 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001258 self._check_queue_entry_statuses(
1259 self.queue_entries,
1260 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1261 models.HostQueueEntry.Status.RUNNING),
1262 allowed_host_statuses=(models.Host.Status.PENDING,
1263 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001264
1265 super(QueueTask, self).prolog()
1266
1267 for queue_entry in self.queue_entries:
1268 self._write_host_keyvals(queue_entry.host)
1269 queue_entry.host.set_status(models.Host.Status.RUNNING)
1270 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001271
1272
1273 def _finish_task(self):
1274 super(QueueTask, self)._finish_task()
1275
1276 for queue_entry in self.queue_entries:
1277 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001278 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001279
1280
Alex Miller9f01d5d2013-08-08 02:26:01 -07001281 def _command_line(self):
1282 invocation = super(QueueTask, self)._command_line()
1283 return invocation + ['--verify_job_repo_url']
1284
1285
Dan Shi1a189052013-10-28 14:41:35 -07001286class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001287 def __init__(self, queue_entry):
1288 super(HostlessQueueTask, self).__init__([queue_entry])
1289 self.queue_entry_ids = [queue_entry.id]
1290
1291
1292 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001293 super(HostlessQueueTask, self).prolog()
1294
1295
mbligh4608b002010-01-05 18:22:35 +00001296 def _finish_task(self):
1297 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001298
1299 # When a job is added to database, its initial status is always
1300 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1301 # status, check if any of them can be started. If scheduler hits some
Alex Millerac189f32014-06-23 13:55:23 -07001302 # limit, e.g., max_hostless_jobs_per_drone,
1303 # max_processes_started_per_cycle, scheduler will leave these jobs in
1304 # Starting status. Otherwise, the jobs' status will be changed to
1305 # Running, and an autoserv process will be started in drone for each of
1306 # these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001307 # If the entry is still in status Starting, the process has not started
1308 # yet. Therefore, there is no need to parse and collect log. Without
1309 # this check, exception will be raised by scheduler as execution_subdir
1310 # for this queue entry does not have a value yet.
1311 hqe = self.queue_entries[0]
1312 if hqe.status != models.HostQueueEntry.Status.STARTING:
1313 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001314
1315
mbligh36768f02008-02-22 18:28:33 +00001316if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001317 main()