blob: ef767960707bd0d02929bac7f37627cee38bc318 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
mbligh36768f02008-02-22 18:28:33 +00002"""
3Autotest scheduler
4"""
showard909c7a62008-07-15 21:52:38 +00005
Dan Shif6c65bd2014-08-29 16:15:07 -07006import datetime
7import gc
8import logging
9import optparse
10import os
11import signal
12import sys
13import time
showard402934a2009-12-21 22:20:47 +000014
Alex Miller05d7b4c2013-03-04 07:49:38 -080015import common
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
Dan Shiec1d47d2015-02-13 11:38:13 -080020from autotest_lib.client.common_lib import control_data
Prashanth B0e960282014-05-13 19:38:28 -070021from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070022from autotest_lib.client.common_lib import utils
Gabe Black1e1c41b2015-02-04 23:55:15 -080023from autotest_lib.client.common_lib.cros.graphite import autotest_stats
Prashanth B0e960282014-05-13 19:38:28 -070024from autotest_lib.frontend.afe import models, rpc_utils
Fang Dengc330bee2014-10-21 18:10:55 -070025from autotest_lib.scheduler import agent_task, drone_manager
beeps5e2bb4a2013-10-28 11:26:45 -070026from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
27from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070028from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070029from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070030from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000031from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080032from autotest_lib.scheduler import status_server, scheduler_config
Prashanth Bf66d51b2014-05-06 12:42:25 -070033from autotest_lib.scheduler import scheduler_lib
Aviv Keshet308e7362013-05-21 14:43:16 -070034from autotest_lib.server import autoserv_utils
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080035from autotest_lib.server import utils as server_utils
Dan Shib9144a42014-12-01 16:09:32 -080036from autotest_lib.site_utils import server_manager_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080037
showard549afad2009-08-20 23:33:36 +000038BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
39PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000040
mbligh36768f02008-02-22 18:28:33 +000041RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000042AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
43
44if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000045 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000046AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
47AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
48
49if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000050 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000051
showard35162b02009-03-03 02:17:30 +000052# error message to leave in results dir when an autoserv process disappears
53# mysteriously
54_LOST_PROCESS_ERROR = """\
55Autoserv failed abnormally during execution for this job, probably due to a
56system error on the Autotest server. Full results may not be available. Sorry.
57"""
58
Prashanth B0e960282014-05-13 19:38:28 -070059_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070060_db = None
mbligh36768f02008-02-22 18:28:33 +000061_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070062
63# These 2 globals are replaced for testing
64_autoserv_directory = autoserv_utils.autoserv_directory
65_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000066_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000067_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070068_inline_host_acquisition = global_config.global_config.get_config_value(
69 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
70 default=True)
71
Dan Shiec1d47d2015-02-13 11:38:13 -080072_enable_ssp_container = global_config.global_config.get_config_value(
73 'AUTOSERV', 'enable_ssp_container', type=bool,
74 default=True)
mbligh36768f02008-02-22 18:28:33 +000075
mbligh83c1e9e2009-05-01 23:10:41 +000076def _site_init_monitor_db_dummy():
77 return {}
78
79
jamesren76fcf192010-04-21 20:39:50 +000080def _verify_default_drone_set_exists():
81 if (models.DroneSet.drone_sets_enabled() and
82 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070083 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080084 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000085
86
87def _sanity_check():
88 """Make sure the configs are consistent before starting the scheduler"""
89 _verify_default_drone_set_exists()
90
91
mbligh36768f02008-02-22 18:28:33 +000092def main():
showard27f33872009-04-07 18:20:53 +000093 try:
showard549afad2009-08-20 23:33:36 +000094 try:
95 main_without_exception_handling()
96 except SystemExit:
97 raise
98 except:
99 logging.exception('Exception escaping in monitor_db')
100 raise
101 finally:
102 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000103
104
105def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700106 scheduler_lib.setup_logging(
107 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
108 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000109 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000110 parser = optparse.OptionParser(usage)
111 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
112 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000113 parser.add_option('--test', help='Indicate that scheduler is under ' +
114 'test and should use dummy autoserv and no parsing',
115 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700116 parser.add_option('--production',
117 help=('Indicate that scheduler is running in production '
118 'environment and it can use database that is not '
119 'hosted in localhost. If it is set to False, '
120 'scheduler will fail if database is not in '
121 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700122 action='store_true', default=False)
jadmanski0afbb632008-06-06 21:10:57 +0000123 (options, args) = parser.parse_args()
124 if len(args) != 1:
125 parser.print_usage()
126 return
mbligh36768f02008-02-22 18:28:33 +0000127
Dan Shif6c65bd2014-08-29 16:15:07 -0700128 scheduler_lib.check_production_settings(options)
129
showard5613c662009-06-08 23:30:33 +0000130 scheduler_enabled = global_config.global_config.get_config_value(
131 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
132
133 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800134 logging.error("Scheduler not enabled, set enable_scheduler to true in "
135 "the global_config's SCHEDULER section to enable it. "
136 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000137 sys.exit(1)
138
jadmanski0afbb632008-06-06 21:10:57 +0000139 global RESULTS_DIR
140 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000141
mbligh83c1e9e2009-05-01 23:10:41 +0000142 site_init = utils.import_site_function(__file__,
143 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
144 _site_init_monitor_db_dummy)
145 site_init()
146
showardcca334f2009-03-12 20:38:34 +0000147 # Change the cwd while running to avoid issues incase we were launched from
148 # somewhere odd (such as a random NFS home directory of the person running
149 # sudo to launch us as the appropriate user).
150 os.chdir(RESULTS_DIR)
151
jamesrenc7d387e2010-08-10 21:48:30 +0000152 # This is helpful for debugging why stuff a scheduler launches is
153 # misbehaving.
154 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000155
jadmanski0afbb632008-06-06 21:10:57 +0000156 if options.test:
157 global _autoserv_path
158 _autoserv_path = 'autoserv_dummy'
159 global _testing_mode
160 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000161
jamesrenc44ae992010-02-19 00:12:54 +0000162 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000163 server.start()
164
jadmanski0afbb632008-06-06 21:10:57 +0000165 try:
jamesrenc44ae992010-02-19 00:12:54 +0000166 initialize()
showardc5afc462009-01-13 00:09:39 +0000167 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000168 dispatcher.initialize(recover_hosts=options.recover_hosts)
Shuqian Zhaoce59fe52015-03-18 10:53:58 -0700169 minimum_tick_sec = global_config.global_config.get_config_value(
170 scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
showardc5afc462009-01-13 00:09:39 +0000171
Eric Lia82dc352011-02-23 13:15:52 -0800172 while not _shutdown and not server._shutdown_scheduler:
Shuqian Zhaoce59fe52015-03-18 10:53:58 -0700173 start = time.time()
jadmanski0afbb632008-06-06 21:10:57 +0000174 dispatcher.tick()
Shuqian Zhaoce59fe52015-03-18 10:53:58 -0700175 curr_tick_sec = time.time() - start
176 if (minimum_tick_sec > curr_tick_sec):
177 time.sleep(minimum_tick_sec - curr_tick_sec)
178 else:
179 time.sleep(0.0001)
Prashanth B4ec98672014-05-15 10:44:54 -0700180 except Exception:
showard170873e2009-01-07 00:22:26 +0000181 email_manager.manager.log_stacktrace(
182 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000183
showard170873e2009-01-07 00:22:26 +0000184 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000185 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000186 _drone_manager.shutdown()
Prashanth B0e960282014-05-13 19:38:28 -0700187 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000188
189
Prashanth B4ec98672014-05-15 10:44:54 -0700190def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000191 global _shutdown
192 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000193 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000194
195
jamesrenc44ae992010-02-19 00:12:54 +0000196def initialize():
showardb18134f2009-03-20 20:52:18 +0000197 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
198 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000199
showard8de37132009-08-31 18:33:08 +0000200 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000201 logging.critical("monitor_db already running, aborting!")
202 sys.exit(1)
203 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000204
showardb1e51872008-10-07 11:08:18 +0000205 if _testing_mode:
206 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700207 scheduler_lib.DB_CONFIG_SECTION, 'database',
208 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000209
Dan Shib9144a42014-12-01 16:09:32 -0800210 # If server database is enabled, check if the server has role `scheduler`.
211 # If the server does not have scheduler role, exception will be raised and
212 # scheduler will not continue to run.
213 if server_manager_utils.use_server_db():
214 server_manager_utils.confirm_server_has_role(hostname='localhost',
215 role='scheduler')
216
jadmanski0afbb632008-06-06 21:10:57 +0000217 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700218 global _db_manager
219 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700220 global _db
221 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000222 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700223 signal.signal(signal.SIGINT, handle_signal)
224 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000225
jamesrenc44ae992010-02-19 00:12:54 +0000226 initialize_globals()
227 scheduler_models.initialize()
228
Dan Shib9144a42014-12-01 16:09:32 -0800229 if server_manager_utils.use_server_db():
230 drone_list = server_manager_utils.get_drones()
231 else:
232 drones = global_config.global_config.get_config_value(
233 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
234 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000235 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000236 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000237 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
238
showardb18134f2009-03-20 20:52:18 +0000239 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000240
241
jamesrenc44ae992010-02-19 00:12:54 +0000242def initialize_globals():
243 global _drone_manager
244 _drone_manager = drone_manager.instance()
245
246
showarded2afea2009-07-07 20:54:07 +0000247def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
248 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000249 """
250 @returns The autoserv command line as a list of executable + parameters.
251
252 @param machines - string - A machine or comma separated list of machines
253 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000254 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700255 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
256 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000257 @param queue_entry - A HostQueueEntry object - If supplied and no Job
258 object was supplied, this will be used to lookup the Job object.
259 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700260 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
261 machines, results_directory=drone_manager.WORKING_DIRECTORY,
262 extra_args=extra_args, job=job, queue_entry=queue_entry,
263 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000264
265
Simran Basia858a232012-08-21 11:04:37 -0700266class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800267
268
jadmanski0afbb632008-06-06 21:10:57 +0000269 def __init__(self):
270 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000271 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700272 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000273 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700274 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700275 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Jakob Jülich36accc62014-07-23 10:26:55 -0700276 _db)
showard170873e2009-01-07 00:22:26 +0000277 self._host_agents = {}
278 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000279 self._tick_count = 0
280 self._last_garbage_stats_time = time.time()
281 self._seconds_between_garbage_stats = 60 * (
282 global_config.global_config.get_config_value(
283 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700284 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700285 self._tick_debug = global_config.global_config.get_config_value(
286 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
287 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700288 self._extra_debugging = global_config.global_config.get_config_value(
289 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
290 default=False)
mbligh36768f02008-02-22 18:28:33 +0000291
Prashanth Bf66d51b2014-05-06 12:42:25 -0700292 # If _inline_host_acquisition is set the scheduler will acquire and
293 # release hosts against jobs inline, with the tick. Otherwise the
294 # scheduler will only focus on jobs that already have hosts, and
295 # will not explicitly unlease a host when a job finishes using it.
296 self._job_query_manager = query_managers.AFEJobQueryManager()
297 self._host_scheduler = (host_scheduler.BaseHostScheduler()
298 if _inline_host_acquisition else
299 host_scheduler.DummyHostScheduler())
300
mbligh36768f02008-02-22 18:28:33 +0000301
showard915958d2009-04-22 21:00:58 +0000302 def initialize(self, recover_hosts=True):
303 self._periodic_cleanup.initialize()
304 self._24hr_upkeep.initialize()
305
jadmanski0afbb632008-06-06 21:10:57 +0000306 # always recover processes
307 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000308
jadmanski0afbb632008-06-06 21:10:57 +0000309 if recover_hosts:
310 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000311
312
Simran Basi0ec94dd2012-08-28 09:50:10 -0700313 def _log_tick_msg(self, msg):
314 if self._tick_debug:
315 logging.debug(msg)
316
317
Simran Basidef92872012-09-20 13:34:34 -0700318 def _log_extra_msg(self, msg):
319 if self._extra_debugging:
320 logging.debug(msg)
321
322
jadmanski0afbb632008-06-06 21:10:57 +0000323 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700324 """
325 This is an altered version of tick() where we keep track of when each
326 major step begins so we can try to figure out where we are using most
327 of the tick time.
328 """
Gabe Black1e1c41b2015-02-04 23:55:15 -0800329 timer = autotest_stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700330 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000331 self._garbage_collection()
Prashanth B340fd1e2014-06-22 12:44:10 -0700332 self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
333 _drone_manager.trigger_refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700334 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000335 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700336 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000337 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700338 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000339 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700340 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000341 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700342 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000343 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700344 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000345 self._schedule_new_jobs()
Prashanth B340fd1e2014-06-22 12:44:10 -0700346 self._log_tick_msg('Calling _drone_manager.sync_refresh().')
347 _drone_manager.sync_refresh()
Prashanth B67548092014-07-11 18:46:01 -0700348 self._log_tick_msg('Calling _find_aborting().')
349 self._find_aborting()
350 self._log_tick_msg('Calling _find_aborted_special_tasks().')
351 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700352 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000353 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700354 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000355 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700356 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000357 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700358 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700359 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700360 with timer.get_client('email_manager_send_queued_emails'):
361 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700362 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700363 with timer.get_client('django_db_reset_queries'):
364 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000365 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000366
showard97aed502008-11-04 02:01:24 +0000367
mblighf3294cc2009-04-08 21:17:38 +0000368 def _run_cleanup(self):
369 self._periodic_cleanup.run_cleanup_maybe()
370 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000371
mbligh36768f02008-02-22 18:28:33 +0000372
showardf13a9e22009-12-18 22:54:09 +0000373 def _garbage_collection(self):
374 threshold_time = time.time() - self._seconds_between_garbage_stats
375 if threshold_time < self._last_garbage_stats_time:
376 # Don't generate these reports very often.
377 return
378
379 self._last_garbage_stats_time = time.time()
380 # Force a full level 0 collection (because we can, it doesn't hurt
381 # at this interval).
382 gc.collect()
383 logging.info('Logging garbage collector stats on tick %d.',
384 self._tick_count)
385 gc_stats._log_garbage_collector_stats()
386
387
showard170873e2009-01-07 00:22:26 +0000388 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
389 for object_id in object_ids:
390 agent_dict.setdefault(object_id, set()).add(agent)
391
392
393 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
394 for object_id in object_ids:
395 assert object_id in agent_dict
396 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700397 # If an ID has no more active agent associated, there is no need to
398 # keep it in the dictionary. Otherwise, scheduler will keep an
399 # unnecessarily big dictionary until being restarted.
400 if not agent_dict[object_id]:
401 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000402
403
showardd1195652009-12-08 22:21:02 +0000404 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700405 """
406 Creates and adds an agent to the dispatchers list.
407
408 In creating the agent we also pass on all the queue_entry_ids and
409 host_ids from the special agent task. For every agent we create, we
410 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
411 against the host_ids given to it. So theoritically, a host can have any
412 number of agents associated with it, and each of them can have any
413 special agent task, though in practice we never see > 1 agent/task per
414 host at any time.
415
416 @param agent_task: A SpecialTask for the agent to manage.
417 """
showardd1195652009-12-08 22:21:02 +0000418 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000419 self._agents.append(agent)
420 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000421 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
422 self._register_agent_for_ids(self._queue_entry_agents,
423 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000424
showard170873e2009-01-07 00:22:26 +0000425
426 def get_agents_for_entry(self, queue_entry):
427 """
428 Find agents corresponding to the specified queue_entry.
429 """
showardd3dc1992009-04-22 21:01:40 +0000430 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000431
432
433 def host_has_agent(self, host):
434 """
435 Determine if there is currently an Agent present using this host.
436 """
437 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000438
439
jadmanski0afbb632008-06-06 21:10:57 +0000440 def remove_agent(self, agent):
441 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000442 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
443 agent)
444 self._unregister_agent_for_ids(self._queue_entry_agents,
445 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000446
447
showard8cc058f2009-09-08 16:26:33 +0000448 def _host_has_scheduled_special_task(self, host):
449 return bool(models.SpecialTask.objects.filter(host__id=host.id,
450 is_active=False,
451 is_complete=False))
452
453
jadmanski0afbb632008-06-06 21:10:57 +0000454 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000455 agent_tasks = self._create_recovery_agent_tasks()
456 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000457 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000458 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000459 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000460 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000461 self._reverify_remaining_hosts()
462 # reinitialize drones after killing orphaned processes, since they can
463 # leave around files when they die
464 _drone_manager.execute_actions()
465 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000466
showard170873e2009-01-07 00:22:26 +0000467
showardd1195652009-12-08 22:21:02 +0000468 def _create_recovery_agent_tasks(self):
469 return (self._get_queue_entry_agent_tasks()
470 + self._get_special_task_agent_tasks(is_active=True))
471
472
473 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700474 """
475 Get agent tasks for all hqe in the specified states.
476
477 Loosely this translates to taking a hqe in one of the specified states,
478 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
479 through _get_agent_task_for_queue_entry. Each queue entry can only have
480 one agent task at a time, but there might be multiple queue entries in
481 the group.
482
483 @return: A list of AgentTasks.
484 """
showardd1195652009-12-08 22:21:02 +0000485 # host queue entry statuses handled directly by AgentTasks (Verifying is
486 # handled through SpecialTasks, so is not listed here)
487 statuses = (models.HostQueueEntry.Status.STARTING,
488 models.HostQueueEntry.Status.RUNNING,
489 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000490 models.HostQueueEntry.Status.PARSING,
491 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000492 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000493 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000494 where='status IN (%s)' % status_list)
Gabe Black1e1c41b2015-02-04 23:55:15 -0800495 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Alex Miller47cd2472013-11-25 15:20:04 -0800496 'running', len(queue_entries))
showardd1195652009-12-08 22:21:02 +0000497
498 agent_tasks = []
499 used_queue_entries = set()
500 for entry in queue_entries:
501 if self.get_agents_for_entry(entry):
502 # already being handled
503 continue
504 if entry in used_queue_entries:
505 # already picked up by a synchronous job
506 continue
507 agent_task = self._get_agent_task_for_queue_entry(entry)
508 agent_tasks.append(agent_task)
509 used_queue_entries.update(agent_task.queue_entries)
510 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000511
512
showardd1195652009-12-08 22:21:02 +0000513 def _get_special_task_agent_tasks(self, is_active=False):
514 special_tasks = models.SpecialTask.objects.filter(
515 is_active=is_active, is_complete=False)
516 return [self._get_agent_task_for_special_task(task)
517 for task in special_tasks]
518
519
520 def _get_agent_task_for_queue_entry(self, queue_entry):
521 """
beeps8bb1f7d2013-08-05 01:30:09 -0700522 Construct an AgentTask instance for the given active HostQueueEntry.
523
showardd1195652009-12-08 22:21:02 +0000524 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700525 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000526 """
527 task_entries = queue_entry.job.get_group_entries(queue_entry)
528 self._check_for_duplicate_host_entries(task_entries)
529
530 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
531 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000532 if queue_entry.is_hostless():
533 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000534 return QueueTask(queue_entries=task_entries)
535 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700536 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000537 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700538 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000539 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700540 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000541
Prashanth B0e960282014-05-13 19:38:28 -0700542 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800543 '_get_agent_task_for_queue_entry got entry with '
544 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000545
546
547 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000548 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
549 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000550 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000551 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000552 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000553 if using_host:
showardd1195652009-12-08 22:21:02 +0000554 self._assert_host_has_no_agent(task_entry)
555
556
557 def _assert_host_has_no_agent(self, entry):
558 """
559 @param entry: a HostQueueEntry or a SpecialTask
560 """
561 if self.host_has_agent(entry.host):
562 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700563 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000564 'While scheduling %s, host %s already has a host agent %s'
565 % (entry, entry.host, agent.task))
566
567
568 def _get_agent_task_for_special_task(self, special_task):
569 """
570 Construct an AgentTask class to run the given SpecialTask and add it
571 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700572
MK Ryu35d661e2014-09-25 17:44:10 -0700573 A special task is created through schedule_special_tasks, but only if
beeps8bb1f7d2013-08-05 01:30:09 -0700574 the host doesn't already have an agent. This happens through
575 add_agent_task. All special agent tasks are given a host on creation,
576 and a Null hqe. To create a SpecialAgentTask object, you need a
577 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
578 object contains a hqe it's passed on to the special agent task, which
579 creates a HostQueueEntry and saves it as it's queue_entry.
580
showardd1195652009-12-08 22:21:02 +0000581 @param special_task: a models.SpecialTask instance
582 @returns an AgentTask to run this SpecialTask
583 """
584 self._assert_host_has_no_agent(special_task)
585
beeps5e2bb4a2013-10-28 11:26:45 -0700586 special_agent_task_classes = (prejob_task.CleanupTask,
587 prejob_task.VerifyTask,
588 prejob_task.RepairTask,
589 prejob_task.ResetTask,
590 prejob_task.ProvisionTask)
591
showardd1195652009-12-08 22:21:02 +0000592 for agent_task_class in special_agent_task_classes:
593 if agent_task_class.TASK_TYPE == special_task.task:
594 return agent_task_class(task=special_task)
595
Prashanth B0e960282014-05-13 19:38:28 -0700596 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800597 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000598
599
600 def _register_pidfiles(self, agent_tasks):
601 for agent_task in agent_tasks:
602 agent_task.register_necessary_pidfiles()
603
604
605 def _recover_tasks(self, agent_tasks):
606 orphans = _drone_manager.get_orphaned_autoserv_processes()
607
608 for agent_task in agent_tasks:
609 agent_task.recover()
610 if agent_task.monitor and agent_task.monitor.has_process():
611 orphans.discard(agent_task.monitor.get_process())
612 self.add_agent_task(agent_task)
613
614 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000615
616
showard8cc058f2009-09-08 16:26:33 +0000617 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000618 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
619 % status):
showard0db3d432009-10-12 20:29:15 +0000620 if entry.status == status and not self.get_agents_for_entry(entry):
621 # The status can change during iteration, e.g., if job.run()
622 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000623 yield entry
624
625
showard6878e8b2009-07-20 22:37:45 +0000626 def _check_for_remaining_orphan_processes(self, orphans):
627 if not orphans:
628 return
629 subject = 'Unrecovered orphan autoserv processes remain'
630 message = '\n'.join(str(process) for process in orphans)
631 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000632
633 die_on_orphans = global_config.global_config.get_config_value(
634 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
635
636 if die_on_orphans:
637 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000638
showard170873e2009-01-07 00:22:26 +0000639
showard8cc058f2009-09-08 16:26:33 +0000640 def _recover_pending_entries(self):
641 for entry in self._get_unassigned_entries(
642 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000643 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000644 entry.on_pending()
645
646
showardb8900452009-10-12 20:31:01 +0000647 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000648 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000649 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
650 unrecovered_hqes = []
651 for queue_entry in queue_entries:
652 special_tasks = models.SpecialTask.objects.filter(
653 task__in=(models.SpecialTask.Task.CLEANUP,
654 models.SpecialTask.Task.VERIFY),
655 queue_entry__id=queue_entry.id,
656 is_complete=False)
657 if special_tasks.count() == 0:
658 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000659
showardb8900452009-10-12 20:31:01 +0000660 if unrecovered_hqes:
661 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700662 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000663 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000664 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000665
666
showard65db3932009-10-28 19:54:35 +0000667 def _schedule_special_tasks(self):
668 """
669 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700670
671 Special tasks include PreJobTasks like verify, reset and cleanup.
672 They are created through _schedule_new_jobs and associated with a hqe
673 This method translates SpecialTasks to the appropriate AgentTask and
674 adds them to the dispatchers agents list, so _handle_agents can execute
675 them.
showard65db3932009-10-28 19:54:35 +0000676 """
Prashanth B4ec98672014-05-15 10:44:54 -0700677 # When the host scheduler is responsible for acquisition we only want
678 # to run tasks with leased hosts. All hqe tasks will already have
679 # leased hosts, and we don't want to run frontend tasks till the host
680 # scheduler has vetted the assignment. Note that this doesn't include
681 # frontend tasks with hosts leased by other active hqes.
682 for task in self._job_query_manager.get_prioritized_special_tasks(
683 only_tasks_with_leased_hosts=not _inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000684 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000685 continue
showardd1195652009-12-08 22:21:02 +0000686 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000687
688
showard170873e2009-01-07 00:22:26 +0000689 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000690 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000691 # should never happen
showarded2afea2009-07-07 20:54:07 +0000692 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000693 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000694 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700695 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000696 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000697
698
jadmanski0afbb632008-06-06 21:10:57 +0000699 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000700 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700701 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000702 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000703 if self.host_has_agent(host):
704 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000705 continue
showard8cc058f2009-09-08 16:26:33 +0000706 if self._host_has_scheduled_special_task(host):
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700707 # host will have a special task scheduled on the next tick
showard8cc058f2009-09-08 16:26:33 +0000708 continue
showard170873e2009-01-07 00:22:26 +0000709 if print_message:
showardb18134f2009-03-20 20:52:18 +0000710 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000711 models.SpecialTask.objects.create(
712 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000713 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000714
715
jadmanski0afbb632008-06-06 21:10:57 +0000716 def _recover_hosts(self):
717 # recover "Repair Failed" hosts
718 message = 'Reverifying dead host %s'
719 self._reverify_hosts_where("status = 'Repair Failed'",
720 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000721
722
showard89f84db2009-03-12 20:39:13 +0000723 def _refresh_pending_queue_entries(self):
724 """
725 Lookup the pending HostQueueEntries and call our HostScheduler
726 refresh() method given that list. Return the list.
727
728 @returns A list of pending HostQueueEntries sorted in priority order.
729 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700730 queue_entries = self._job_query_manager.get_pending_queue_entries(
731 only_hostless=not _inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000732 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000733 return []
showard89f84db2009-03-12 20:39:13 +0000734 return queue_entries
735
736
showarda9545c02009-12-18 22:44:26 +0000737 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800738 """Schedule a hostless (suite) job.
739
740 @param queue_entry: The queue_entry representing the hostless job.
741 """
showarda9545c02009-12-18 22:44:26 +0000742 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700743
744 # Need to set execution_subdir before setting the status:
745 # After a restart of the scheduler, agents will be restored for HQEs in
746 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
747 # execution_subdir is needed. Therefore it must be set before entering
748 # one of these states.
749 # Otherwise, if the scheduler was interrupted between setting the status
750 # and the execution_subdir, upon it's restart restoring agents would
751 # fail.
752 # Is there a way to get a status in one of these states without going
753 # through this code? Following cases are possible:
754 # - If it's aborted before being started:
755 # active bit will be 0, so there's nothing to parse, it will just be
756 # set to completed by _find_aborting. Critical statuses are skipped.
757 # - If it's aborted or it fails after being started:
758 # It was started, so this code was executed.
759 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000760 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000761
762
beepscc9fc702013-12-02 12:45:38 -0800763 def _schedule_host_job(self, host, queue_entry):
764 """Schedules a job on the given host.
765
766 1. Assign the host to the hqe, if it isn't already assigned.
767 2. Create a SpecialAgentTask for the hqe.
768 3. Activate the hqe.
769
770 @param queue_entry: The job to schedule.
771 @param host: The host to schedule the job on.
772 """
773 if self.host_has_agent(host):
774 host_agent_task = list(self._host_agents.get(host.id))[0].task
775 subject = 'Host with agents assigned to an HQE'
776 message = ('HQE: %s assigned host %s, but the host has '
777 'agent: %s for queue_entry %s. The HQE '
beepsf7d35162014-02-13 16:42:13 -0800778 'will have to try and acquire a host next tick ' %
beepscc9fc702013-12-02 12:45:38 -0800779 (queue_entry, host.hostname, host_agent_task,
780 host_agent_task.queue_entry))
781 email_manager.manager.enqueue_notify_email(subject, message)
beepscc9fc702013-12-02 12:45:38 -0800782 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700783 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800784
785
showard89f84db2009-03-12 20:39:13 +0000786 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700787 """
788 Find any new HQEs and call schedule_pre_job_tasks for it.
789
790 This involves setting the status of the HQE and creating a row in the
791 db corresponding the the special task, through
792 scheduler_models._queue_special_task. The new db row is then added as
793 an agent to the dispatcher through _schedule_special_tasks and
794 scheduled for execution on the drone through _handle_agents.
795 """
showard89f84db2009-03-12 20:39:13 +0000796 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000797
beepscc9fc702013-12-02 12:45:38 -0800798 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700799 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700800 new_jobs_with_hosts = 0
801 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800802 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700803 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000804
beepscc9fc702013-12-02 12:45:38 -0800805 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000806 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000807 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700808 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000809 else:
beepscc9fc702013-12-02 12:45:38 -0800810 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700811 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700812
Gabe Black1e1c41b2015-02-04 23:55:15 -0800813 autotest_stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800814 if not host_jobs:
815 return
Prashanth Bf66d51b2014-05-06 12:42:25 -0700816 if not _inline_host_acquisition:
817 message = ('Found %s jobs that need hosts though '
818 '_inline_host_acquisition=%s. Will acquire hosts.' %
819 ([str(job) for job in host_jobs],
820 _inline_host_acquisition))
821 email_manager.manager.enqueue_notify_email(
822 'Processing unexpected host acquisition requests', message)
823 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
824 for host_assignment in jobs_with_hosts:
825 self._schedule_host_job(host_assignment.host, host_assignment.job)
826 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800827
Gabe Black1e1c41b2015-02-04 23:55:15 -0800828 autotest_stats.Gauge(key).send('new_jobs_with_hosts',
829 new_jobs_with_hosts)
830 autotest_stats.Gauge(key).send('new_jobs_without_hosts',
831 new_jobs_need_hosts -
832 new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000833
834
showard8cc058f2009-09-08 16:26:33 +0000835 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700836 """
837 Adds agents to the dispatcher.
838
839 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
840 QueueTask for example, will have a job with a control file, and
841 the agent will have methods that poll, abort and check if the queue
842 task is finished. The dispatcher runs the agent_task, as well as
843 other agents in it's _agents member, through _handle_agents, by
844 calling the Agents tick().
845
846 This method creates an agent for each HQE in one of (starting, running,
847 gathering, parsing, archiving) states, and adds it to the dispatcher so
848 it is handled by _handle_agents.
849 """
showardd1195652009-12-08 22:21:02 +0000850 for agent_task in self._get_queue_entry_agent_tasks():
851 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000852
853
854 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000855 for entry in scheduler_models.HostQueueEntry.fetch(
856 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000857 task = entry.job.schedule_delayed_callback_task(entry)
858 if task:
showardd1195652009-12-08 22:21:02 +0000859 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000860
861
jadmanski0afbb632008-06-06 21:10:57 +0000862 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700863 """
864 Looks through the afe_host_queue_entries for an aborted entry.
865
866 The aborted bit is set on an HQE in many ways, the most common
867 being when a user requests an abort through the frontend, which
868 results in an rpc from the afe to abort_host_queue_entries.
869 """
jamesrene7c65cb2010-06-08 20:38:10 +0000870 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000871 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700872 where='aborted=1 and complete=0'):
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800873
874 # If the job is running on a shard, let the shard handle aborting
875 # it and sync back the right status.
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800876 if entry.job.shard_id is not None and not server_utils.is_shard():
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800877 logging.info('Waiting for shard %s to abort hqe %s',
878 entry.job.shard_id, entry)
879 continue
880
showardf4a2e502009-07-28 20:06:39 +0000881 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800882
883 # The task would have started off with both is_complete and
884 # is_active = False. Aborted tasks are neither active nor complete.
885 # For all currently active tasks this will happen through the agent,
886 # but we need to manually update the special tasks that haven't
887 # started yet, because they don't have agents.
888 models.SpecialTask.objects.filter(is_active=False,
889 queue_entry_id=entry.id).update(is_complete=True)
890
showardd3dc1992009-04-22 21:01:40 +0000891 for agent in self.get_agents_for_entry(entry):
892 agent.abort()
893 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000894 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700895 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000896 for job in jobs_to_stop:
897 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000898
899
beeps8bb1f7d2013-08-05 01:30:09 -0700900 def _find_aborted_special_tasks(self):
901 """
902 Find SpecialTasks that have been marked for abortion.
903
904 Poll the database looking for SpecialTasks that are active
905 and have been marked for abortion, then abort them.
906 """
907
908 # The completed and active bits are very important when it comes
909 # to scheduler correctness. The active bit is set through the prolog
910 # of a special task, and reset through the cleanup method of the
911 # SpecialAgentTask. The cleanup is called both through the abort and
912 # epilog. The complete bit is set in several places, and in general
913 # a hanging job will have is_active=1 is_complete=0, while a special
914 # task which completed will have is_active=0 is_complete=1. To check
915 # aborts we directly check active because the complete bit is set in
916 # several places, including the epilog of agent tasks.
917 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
918 is_aborted=True)
919 for task in aborted_tasks:
920 # There are 2 ways to get the agent associated with a task,
921 # through the host and through the hqe. A special task
922 # always needs a host, but doesn't always need a hqe.
923 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700924 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000925
beeps8bb1f7d2013-08-05 01:30:09 -0700926 # The epilog preforms critical actions such as
927 # queueing the next SpecialTask, requeuing the
928 # hqe etc, however it doesn't actually kill the
929 # monitor process and set the 'done' bit. Epilogs
930 # assume that the job failed, and that the monitor
931 # process has already written an exit code. The
932 # done bit is a necessary condition for
933 # _handle_agents to schedule any more special
934 # tasks against the host, and it must be set
935 # in addition to is_active, is_complete and success.
936 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000937 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700938
939
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700940 def _can_start_agent(self, agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000941 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000942 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000943 return True
944 # don't allow any nonzero-process agents to run after we've reached a
945 # limit (this avoids starvation of many-process agents)
946 if have_reached_limit:
947 return False
948 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000949 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000950 agent.task.owner_username,
951 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000952 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000953 return False
showard4c5374f2008-09-04 17:02:56 +0000954 return True
955
956
jadmanski0afbb632008-06-06 21:10:57 +0000957 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700958 """
959 Handles agents of the dispatcher.
960
961 Appropriate Agents are added to the dispatcher through
962 _schedule_running_host_queue_entries. These agents each
963 have a task. This method runs the agents task through
964 agent.tick() leading to:
965 agent.start
966 prolog -> AgentTasks prolog
967 For each queue entry:
968 sets host status/status to Running
969 set started_on in afe_host_queue_entries
970 run -> AgentTasks run
971 Creates PidfileRunMonitor
972 Queues the autoserv command line for this AgentTask
973 via the drone manager. These commands are executed
974 through the drone managers execute actions.
975 poll -> AgentTasks/BaseAgentTask poll
976 checks the monitors exit_code.
977 Executes epilog if task is finished.
978 Executes AgentTasks _finish_task
979 finish_task is usually responsible for setting the status
980 of the HQE/host, and updating it's active and complete fileds.
981
982 agent.is_done
983 Removed the agent from the dispatchers _agents queue.
984 Is_done checks the finished bit on the agent, that is
985 set based on the Agents task. During the agents poll
986 we check to see if the monitor process has exited in
987 it's finish method, and set the success member of the
988 task based on this exit code.
989 """
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700990 num_started_this_tick = 0
991 num_finished_this_tick = 0
showard4c5374f2008-09-04 17:02:56 +0000992 have_reached_limit = False
993 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700994 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000995 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700996 self._log_extra_msg('Processing Agent with Host Ids: %s and '
997 'queue_entry ids:%s' % (agent.host_ids,
998 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000999 if not agent.started:
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001000 if not self._can_start_agent(agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001001 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001002 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001003 continue
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001004 num_started_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001005 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001006 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001007 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001008 if agent.is_done():
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001009 num_finished_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001010 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001011 self.remove_agent(agent)
Gabe Black1e1c41b2015-02-04 23:55:15 -08001012 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001013 'agents_started', num_started_this_tick)
Gabe Black1e1c41b2015-02-04 23:55:15 -08001014 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001015 'agents_finished', num_finished_this_tick)
1016 logging.info('%d running processes. %d added this tick.',
Simran Basi3f6717d2012-09-13 15:21:22 -07001017 _drone_manager.total_running_processes(),
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001018 num_started_this_tick)
mbligh36768f02008-02-22 18:28:33 +00001019
1020
showard29f7cd22009-04-29 21:16:24 +00001021 def _process_recurring_runs(self):
1022 recurring_runs = models.RecurringRun.objects.filter(
1023 start_date__lte=datetime.datetime.now())
1024 for rrun in recurring_runs:
1025 # Create job from template
1026 job = rrun.job
1027 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001028 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001029
1030 host_objects = info['hosts']
1031 one_time_hosts = info['one_time_hosts']
1032 metahost_objects = info['meta_hosts']
1033 dependencies = info['dependencies']
1034 atomic_group = info['atomic_group']
1035
1036 for host in one_time_hosts or []:
1037 this_host = models.Host.create_one_time_host(host.hostname)
1038 host_objects.append(this_host)
1039
1040 try:
1041 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001042 options=options,
showard29f7cd22009-04-29 21:16:24 +00001043 host_objects=host_objects,
1044 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001045 atomic_group=atomic_group)
1046
1047 except Exception, ex:
1048 logging.exception(ex)
1049 #TODO send email
1050
1051 if rrun.loop_count == 1:
1052 rrun.delete()
1053 else:
1054 if rrun.loop_count != 0: # if not infinite loop
1055 # calculate new start_date
1056 difference = datetime.timedelta(seconds=rrun.loop_period)
1057 rrun.start_date = rrun.start_date + difference
1058 rrun.loop_count -= 1
1059 rrun.save()
1060
1061
Simran Basia858a232012-08-21 11:04:37 -07001062SiteDispatcher = utils.import_site_class(
1063 __file__, 'autotest_lib.scheduler.site_monitor_db',
1064 'SiteDispatcher', BaseDispatcher)
1065
1066class Dispatcher(SiteDispatcher):
1067 pass
1068
1069
mbligh36768f02008-02-22 18:28:33 +00001070class Agent(object):
showard77182562009-06-10 00:16:05 +00001071 """
Alex Miller47715eb2013-07-24 03:34:01 -07001072 An agent for use by the Dispatcher class to perform a task. An agent wraps
1073 around an AgentTask mainly to associate the AgentTask with the queue_entry
1074 and host ids.
showard77182562009-06-10 00:16:05 +00001075
1076 The following methods are required on all task objects:
1077 poll() - Called periodically to let the task check its status and
1078 update its internal state. If the task succeeded.
1079 is_done() - Returns True if the task is finished.
1080 abort() - Called when an abort has been requested. The task must
1081 set its aborted attribute to True if it actually aborted.
1082
1083 The following attributes are required on all task objects:
1084 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001085 success - bool, True if this task succeeded.
1086 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1087 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001088 """
1089
1090
showard418785b2009-11-23 20:19:59 +00001091 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001092 """
Alex Miller47715eb2013-07-24 03:34:01 -07001093 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001094 """
showard8cc058f2009-09-08 16:26:33 +00001095 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001096
showard77182562009-06-10 00:16:05 +00001097 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001098 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001099
showard8cc058f2009-09-08 16:26:33 +00001100 self.queue_entry_ids = task.queue_entry_ids
1101 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001102
showard8cc058f2009-09-08 16:26:33 +00001103 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001104 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001105
1106
jadmanski0afbb632008-06-06 21:10:57 +00001107 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001108 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001109 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001110 self.task.poll()
1111 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001112 self.finished = True
showardec113162008-05-08 00:52:49 +00001113
1114
jadmanski0afbb632008-06-06 21:10:57 +00001115 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001116 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001117
1118
showardd3dc1992009-04-22 21:01:40 +00001119 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001120 if self.task:
1121 self.task.abort()
1122 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001123 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001124 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001125
showardd3dc1992009-04-22 21:01:40 +00001126
beeps5e2bb4a2013-10-28 11:26:45 -07001127class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001128 """
1129 Common functionality for QueueTask and HostlessQueueTask
1130 """
1131 def __init__(self, queue_entries):
1132 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001133 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001134 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001135
1136
showard73ec0442009-02-07 02:05:20 +00001137 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001138 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001139
1140
jamesrenc44ae992010-02-19 00:12:54 +00001141 def _write_control_file(self, execution_path):
1142 control_path = _drone_manager.attach_file_to_execution(
1143 execution_path, self.job.control_file)
1144 return control_path
1145
1146
Aviv Keshet308e7362013-05-21 14:43:16 -07001147 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001148 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001149 execution_path = self.queue_entries[0].execution_path()
1150 control_path = self._write_control_file(execution_path)
1151 hostnames = ','.join(entry.host.hostname
1152 for entry in self.queue_entries
1153 if not entry.is_hostless())
1154
1155 execution_tag = self.queue_entries[0].execution_tag()
1156 params = _autoserv_command_line(
1157 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001158 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001159 _drone_manager.absolute_path(control_path)],
1160 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001161 if self.job.is_image_update_job():
1162 params += ['--image', self.job.update_image_path]
1163
jamesrenc44ae992010-02-19 00:12:54 +00001164 return params
showardd1195652009-12-08 22:21:02 +00001165
1166
1167 @property
1168 def num_processes(self):
1169 return len(self.queue_entries)
1170
1171
1172 @property
1173 def owner_username(self):
1174 return self.job.owner
1175
1176
1177 def _working_directory(self):
1178 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001179
1180
jadmanski0afbb632008-06-06 21:10:57 +00001181 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001182 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001183 keyval_dict = self.job.keyval_dict()
1184 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001185 group_name = self.queue_entries[0].get_group_name()
1186 if group_name:
1187 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001188 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001189 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001190 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001191 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001192
1193
showard35162b02009-03-03 02:17:30 +00001194 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001195 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001196 _drone_manager.write_lines_to_file(error_file_path,
1197 [_LOST_PROCESS_ERROR])
1198
1199
showardd3dc1992009-04-22 21:01:40 +00001200 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001201 if not self.monitor:
1202 return
1203
showardd9205182009-04-27 20:09:55 +00001204 self._write_job_finished()
1205
showard35162b02009-03-03 02:17:30 +00001206 if self.monitor.lost_process:
1207 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001208
jadmanskif7fa2cc2008-10-01 14:13:23 +00001209
showardcbd74612008-11-19 21:42:02 +00001210 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001211 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001212 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001213 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001214 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001215
1216
jadmanskif7fa2cc2008-10-01 14:13:23 +00001217 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001218 if not self.monitor or not self.monitor.has_process():
1219 return
1220
jadmanskif7fa2cc2008-10-01 14:13:23 +00001221 # build up sets of all the aborted_by and aborted_on values
1222 aborted_by, aborted_on = set(), set()
1223 for queue_entry in self.queue_entries:
1224 if queue_entry.aborted_by:
1225 aborted_by.add(queue_entry.aborted_by)
1226 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1227 aborted_on.add(t)
1228
1229 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001230 # TODO(showard): this conditional is now obsolete, we just need to leave
1231 # it in temporarily for backwards compatibility over upgrades. delete
1232 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001233 assert len(aborted_by) <= 1
1234 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001235 aborted_by_value = aborted_by.pop()
1236 aborted_on_value = max(aborted_on)
1237 else:
1238 aborted_by_value = 'autotest_system'
1239 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001240
showarda0382352009-02-11 23:36:43 +00001241 self._write_keyval_after_job("aborted_by", aborted_by_value)
1242 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001243
showardcbd74612008-11-19 21:42:02 +00001244 aborted_on_string = str(datetime.datetime.fromtimestamp(
1245 aborted_on_value))
1246 self._write_status_comment('Job aborted by %s on %s' %
1247 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001248
1249
jadmanski0afbb632008-06-06 21:10:57 +00001250 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001251 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001252 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001253 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001254
1255
jadmanski0afbb632008-06-06 21:10:57 +00001256 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001257 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001258 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001259
1260
1261class QueueTask(AbstractQueueTask):
1262 def __init__(self, queue_entries):
1263 super(QueueTask, self).__init__(queue_entries)
1264 self._set_ids(queue_entries=queue_entries)
1265
1266
1267 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001268 self._check_queue_entry_statuses(
1269 self.queue_entries,
1270 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1271 models.HostQueueEntry.Status.RUNNING),
1272 allowed_host_statuses=(models.Host.Status.PENDING,
1273 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001274
1275 super(QueueTask, self).prolog()
1276
1277 for queue_entry in self.queue_entries:
1278 self._write_host_keyvals(queue_entry.host)
1279 queue_entry.host.set_status(models.Host.Status.RUNNING)
1280 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001281
1282
1283 def _finish_task(self):
1284 super(QueueTask, self)._finish_task()
1285
1286 for queue_entry in self.queue_entries:
1287 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001288 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001289
1290
Alex Miller9f01d5d2013-08-08 02:26:01 -07001291 def _command_line(self):
1292 invocation = super(QueueTask, self)._command_line()
Dan Shiec1d47d2015-02-13 11:38:13 -08001293 # Check if server-side packaging is needed.
1294 if (_enable_ssp_container and
1295 self.job.control_type == control_data.CONTROL_TYPE.SERVER and
1296 self.job.require_ssp != False):
1297 invocation += ['--require-ssp']
Alex Miller9f01d5d2013-08-08 02:26:01 -07001298 return invocation + ['--verify_job_repo_url']
1299
1300
Dan Shi1a189052013-10-28 14:41:35 -07001301class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001302 def __init__(self, queue_entry):
1303 super(HostlessQueueTask, self).__init__([queue_entry])
1304 self.queue_entry_ids = [queue_entry.id]
1305
1306
1307 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001308 super(HostlessQueueTask, self).prolog()
1309
1310
mbligh4608b002010-01-05 18:22:35 +00001311 def _finish_task(self):
1312 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001313
1314 # When a job is added to database, its initial status is always
1315 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1316 # status, check if any of them can be started. If scheduler hits some
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001317 # limit, e.g., max_hostless_jobs_per_drone, scheduler will
1318 # leave these jobs in Starting status. Otherwise, the jobs'
1319 # status will be changed to Running, and an autoserv process
1320 # will be started in drone for each of these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001321 # If the entry is still in status Starting, the process has not started
1322 # yet. Therefore, there is no need to parse and collect log. Without
1323 # this check, exception will be raised by scheduler as execution_subdir
1324 # for this queue entry does not have a value yet.
1325 hqe = self.queue_entries[0]
1326 if hqe.status != models.HostQueueEntry.Status.STARTING:
1327 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001328
1329
mbligh36768f02008-02-22 18:28:33 +00001330if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001331 main()