blob: a29023a0bc1ef13603e5a88bda824553ae7d9938 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
Dan Shif6c65bd2014-08-29 16:15:07 -07008import datetime
9import gc
10import logging
11import optparse
12import os
13import signal
14import sys
15import time
showard402934a2009-12-21 22:20:47 +000016
Alex Miller05d7b4c2013-03-04 07:49:38 -080017import common
showard21baa452008-10-21 00:08:39 +000018from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000019
20import django.db
21
Dan Shiec1d47d2015-02-13 11:38:13 -080022from autotest_lib.client.common_lib import control_data
Prashanth B0e960282014-05-13 19:38:28 -070023from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070024from autotest_lib.client.common_lib import utils
Gabe Black1e1c41b2015-02-04 23:55:15 -080025from autotest_lib.client.common_lib.cros.graphite import autotest_stats
Prashanth B0e960282014-05-13 19:38:28 -070026from autotest_lib.frontend.afe import models, rpc_utils
Fang Dengc330bee2014-10-21 18:10:55 -070027from autotest_lib.scheduler import agent_task, drone_manager
beeps5e2bb4a2013-10-28 11:26:45 -070028from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
29from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070030from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070031from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070032from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000033from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080034from autotest_lib.scheduler import status_server, scheduler_config
Prashanth Bf66d51b2014-05-06 12:42:25 -070035from autotest_lib.scheduler import scheduler_lib
Aviv Keshet308e7362013-05-21 14:43:16 -070036from autotest_lib.server import autoserv_utils
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080037from autotest_lib.server import utils as server_utils
Dan Shib9144a42014-12-01 16:09:32 -080038from autotest_lib.site_utils import server_manager_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080039
showard549afad2009-08-20 23:33:36 +000040BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
41PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000042
mbligh36768f02008-02-22 18:28:33 +000043RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000044AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
45
46if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000047 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000048AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
49AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
50
51if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000052 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000053
showard35162b02009-03-03 02:17:30 +000054# error message to leave in results dir when an autoserv process disappears
55# mysteriously
56_LOST_PROCESS_ERROR = """\
57Autoserv failed abnormally during execution for this job, probably due to a
58system error on the Autotest server. Full results may not be available. Sorry.
59"""
60
Prashanth B0e960282014-05-13 19:38:28 -070061_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070062_db = None
mbligh36768f02008-02-22 18:28:33 +000063_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070064
65# These 2 globals are replaced for testing
66_autoserv_directory = autoserv_utils.autoserv_directory
67_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000068_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000069_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070070_inline_host_acquisition = global_config.global_config.get_config_value(
71 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
72 default=True)
73
Dan Shiec1d47d2015-02-13 11:38:13 -080074_enable_ssp_container = global_config.global_config.get_config_value(
75 'AUTOSERV', 'enable_ssp_container', type=bool,
76 default=True)
mbligh36768f02008-02-22 18:28:33 +000077
mbligh83c1e9e2009-05-01 23:10:41 +000078def _site_init_monitor_db_dummy():
79 return {}
80
81
jamesren76fcf192010-04-21 20:39:50 +000082def _verify_default_drone_set_exists():
83 if (models.DroneSet.drone_sets_enabled() and
84 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070085 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080086 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000087
88
89def _sanity_check():
90 """Make sure the configs are consistent before starting the scheduler"""
91 _verify_default_drone_set_exists()
92
93
mbligh36768f02008-02-22 18:28:33 +000094def main():
showard27f33872009-04-07 18:20:53 +000095 try:
showard549afad2009-08-20 23:33:36 +000096 try:
97 main_without_exception_handling()
98 except SystemExit:
99 raise
100 except:
101 logging.exception('Exception escaping in monitor_db')
102 raise
103 finally:
104 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000105
106
107def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700108 scheduler_lib.setup_logging(
109 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
110 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000111 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000112 parser = optparse.OptionParser(usage)
113 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
114 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000115 parser.add_option('--test', help='Indicate that scheduler is under ' +
116 'test and should use dummy autoserv and no parsing',
117 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700118 parser.add_option('--production',
119 help=('Indicate that scheduler is running in production '
120 'environment and it can use database that is not '
121 'hosted in localhost. If it is set to False, '
122 'scheduler will fail if database is not in '
123 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700124 action='store_true', default=False)
jadmanski0afbb632008-06-06 21:10:57 +0000125 (options, args) = parser.parse_args()
126 if len(args) != 1:
127 parser.print_usage()
128 return
mbligh36768f02008-02-22 18:28:33 +0000129
Dan Shif6c65bd2014-08-29 16:15:07 -0700130 scheduler_lib.check_production_settings(options)
131
showard5613c662009-06-08 23:30:33 +0000132 scheduler_enabled = global_config.global_config.get_config_value(
133 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
134
135 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800136 logging.error("Scheduler not enabled, set enable_scheduler to true in "
137 "the global_config's SCHEDULER section to enable it. "
138 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000139 sys.exit(1)
140
jadmanski0afbb632008-06-06 21:10:57 +0000141 global RESULTS_DIR
142 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000143
mbligh83c1e9e2009-05-01 23:10:41 +0000144 site_init = utils.import_site_function(__file__,
145 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
146 _site_init_monitor_db_dummy)
147 site_init()
148
showardcca334f2009-03-12 20:38:34 +0000149 # Change the cwd while running to avoid issues incase we were launched from
150 # somewhere odd (such as a random NFS home directory of the person running
151 # sudo to launch us as the appropriate user).
152 os.chdir(RESULTS_DIR)
153
jamesrenc7d387e2010-08-10 21:48:30 +0000154 # This is helpful for debugging why stuff a scheduler launches is
155 # misbehaving.
156 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000157
jadmanski0afbb632008-06-06 21:10:57 +0000158 if options.test:
159 global _autoserv_path
160 _autoserv_path = 'autoserv_dummy'
161 global _testing_mode
162 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000163
jamesrenc44ae992010-02-19 00:12:54 +0000164 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000165 server.start()
166
jadmanski0afbb632008-06-06 21:10:57 +0000167 try:
jamesrenc44ae992010-02-19 00:12:54 +0000168 initialize()
showardc5afc462009-01-13 00:09:39 +0000169 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000170 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000171
Eric Lia82dc352011-02-23 13:15:52 -0800172 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000173 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000174 time.sleep(scheduler_config.config.tick_pause_sec)
Prashanth B4ec98672014-05-15 10:44:54 -0700175 except Exception:
showard170873e2009-01-07 00:22:26 +0000176 email_manager.manager.log_stacktrace(
177 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000178
showard170873e2009-01-07 00:22:26 +0000179 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000180 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000181 _drone_manager.shutdown()
Prashanth B0e960282014-05-13 19:38:28 -0700182 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000183
184
Prashanth B4ec98672014-05-15 10:44:54 -0700185def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000186 global _shutdown
187 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000188 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000189
190
jamesrenc44ae992010-02-19 00:12:54 +0000191def initialize():
showardb18134f2009-03-20 20:52:18 +0000192 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
193 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000194
showard8de37132009-08-31 18:33:08 +0000195 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000196 logging.critical("monitor_db already running, aborting!")
197 sys.exit(1)
198 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000199
showardb1e51872008-10-07 11:08:18 +0000200 if _testing_mode:
201 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700202 scheduler_lib.DB_CONFIG_SECTION, 'database',
203 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000204
Dan Shib9144a42014-12-01 16:09:32 -0800205 # If server database is enabled, check if the server has role `scheduler`.
206 # If the server does not have scheduler role, exception will be raised and
207 # scheduler will not continue to run.
208 if server_manager_utils.use_server_db():
209 server_manager_utils.confirm_server_has_role(hostname='localhost',
210 role='scheduler')
211
jadmanski0afbb632008-06-06 21:10:57 +0000212 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700213 global _db_manager
214 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700215 global _db
216 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000217 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700218 signal.signal(signal.SIGINT, handle_signal)
219 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000220
jamesrenc44ae992010-02-19 00:12:54 +0000221 initialize_globals()
222 scheduler_models.initialize()
223
Dan Shib9144a42014-12-01 16:09:32 -0800224 if server_manager_utils.use_server_db():
225 drone_list = server_manager_utils.get_drones()
226 else:
227 drones = global_config.global_config.get_config_value(
228 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
229 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000230 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000231 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000232 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
233
showardb18134f2009-03-20 20:52:18 +0000234 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000235
236
jamesrenc44ae992010-02-19 00:12:54 +0000237def initialize_globals():
238 global _drone_manager
239 _drone_manager = drone_manager.instance()
240
241
showarded2afea2009-07-07 20:54:07 +0000242def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
243 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000244 """
245 @returns The autoserv command line as a list of executable + parameters.
246
247 @param machines - string - A machine or comma separated list of machines
248 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000249 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700250 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
251 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000252 @param queue_entry - A HostQueueEntry object - If supplied and no Job
253 object was supplied, this will be used to lookup the Job object.
254 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700255 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
256 machines, results_directory=drone_manager.WORKING_DIRECTORY,
257 extra_args=extra_args, job=job, queue_entry=queue_entry,
258 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000259
260
Simran Basia858a232012-08-21 11:04:37 -0700261class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800262
263
jadmanski0afbb632008-06-06 21:10:57 +0000264 def __init__(self):
265 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000266 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700267 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000268 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700269 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700270 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Jakob Jülich36accc62014-07-23 10:26:55 -0700271 _db)
showard170873e2009-01-07 00:22:26 +0000272 self._host_agents = {}
273 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000274 self._tick_count = 0
275 self._last_garbage_stats_time = time.time()
276 self._seconds_between_garbage_stats = 60 * (
277 global_config.global_config.get_config_value(
278 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700279 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700280 self._tick_debug = global_config.global_config.get_config_value(
281 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
282 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700283 self._extra_debugging = global_config.global_config.get_config_value(
284 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
285 default=False)
mbligh36768f02008-02-22 18:28:33 +0000286
Prashanth Bf66d51b2014-05-06 12:42:25 -0700287 # If _inline_host_acquisition is set the scheduler will acquire and
288 # release hosts against jobs inline, with the tick. Otherwise the
289 # scheduler will only focus on jobs that already have hosts, and
290 # will not explicitly unlease a host when a job finishes using it.
291 self._job_query_manager = query_managers.AFEJobQueryManager()
292 self._host_scheduler = (host_scheduler.BaseHostScheduler()
293 if _inline_host_acquisition else
294 host_scheduler.DummyHostScheduler())
295
mbligh36768f02008-02-22 18:28:33 +0000296
showard915958d2009-04-22 21:00:58 +0000297 def initialize(self, recover_hosts=True):
298 self._periodic_cleanup.initialize()
299 self._24hr_upkeep.initialize()
300
jadmanski0afbb632008-06-06 21:10:57 +0000301 # always recover processes
302 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000303
jadmanski0afbb632008-06-06 21:10:57 +0000304 if recover_hosts:
305 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000306
307
Simran Basi0ec94dd2012-08-28 09:50:10 -0700308 def _log_tick_msg(self, msg):
309 if self._tick_debug:
310 logging.debug(msg)
311
312
Simran Basidef92872012-09-20 13:34:34 -0700313 def _log_extra_msg(self, msg):
314 if self._extra_debugging:
315 logging.debug(msg)
316
317
jadmanski0afbb632008-06-06 21:10:57 +0000318 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700319 """
320 This is an altered version of tick() where we keep track of when each
321 major step begins so we can try to figure out where we are using most
322 of the tick time.
323 """
Gabe Black1e1c41b2015-02-04 23:55:15 -0800324 timer = autotest_stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700325 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000326 self._garbage_collection()
Prashanth B340fd1e2014-06-22 12:44:10 -0700327 self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
328 _drone_manager.trigger_refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700329 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000330 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700331 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000332 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700333 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000334 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700335 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000336 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700337 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000338 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700339 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000340 self._schedule_new_jobs()
Prashanth B340fd1e2014-06-22 12:44:10 -0700341 self._log_tick_msg('Calling _drone_manager.sync_refresh().')
342 _drone_manager.sync_refresh()
Prashanth B67548092014-07-11 18:46:01 -0700343 self._log_tick_msg('Calling _find_aborting().')
344 self._find_aborting()
345 self._log_tick_msg('Calling _find_aborted_special_tasks().')
346 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700347 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000348 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700349 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000350 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700351 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000352 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700353 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700354 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700355 with timer.get_client('email_manager_send_queued_emails'):
356 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700357 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700358 with timer.get_client('django_db_reset_queries'):
359 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000360 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000361
showard97aed502008-11-04 02:01:24 +0000362
mblighf3294cc2009-04-08 21:17:38 +0000363 def _run_cleanup(self):
364 self._periodic_cleanup.run_cleanup_maybe()
365 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000366
mbligh36768f02008-02-22 18:28:33 +0000367
showardf13a9e22009-12-18 22:54:09 +0000368 def _garbage_collection(self):
369 threshold_time = time.time() - self._seconds_between_garbage_stats
370 if threshold_time < self._last_garbage_stats_time:
371 # Don't generate these reports very often.
372 return
373
374 self._last_garbage_stats_time = time.time()
375 # Force a full level 0 collection (because we can, it doesn't hurt
376 # at this interval).
377 gc.collect()
378 logging.info('Logging garbage collector stats on tick %d.',
379 self._tick_count)
380 gc_stats._log_garbage_collector_stats()
381
382
showard170873e2009-01-07 00:22:26 +0000383 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
384 for object_id in object_ids:
385 agent_dict.setdefault(object_id, set()).add(agent)
386
387
388 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
389 for object_id in object_ids:
390 assert object_id in agent_dict
391 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700392 # If an ID has no more active agent associated, there is no need to
393 # keep it in the dictionary. Otherwise, scheduler will keep an
394 # unnecessarily big dictionary until being restarted.
395 if not agent_dict[object_id]:
396 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000397
398
showardd1195652009-12-08 22:21:02 +0000399 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700400 """
401 Creates and adds an agent to the dispatchers list.
402
403 In creating the agent we also pass on all the queue_entry_ids and
404 host_ids from the special agent task. For every agent we create, we
405 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
406 against the host_ids given to it. So theoritically, a host can have any
407 number of agents associated with it, and each of them can have any
408 special agent task, though in practice we never see > 1 agent/task per
409 host at any time.
410
411 @param agent_task: A SpecialTask for the agent to manage.
412 """
showardd1195652009-12-08 22:21:02 +0000413 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000414 self._agents.append(agent)
415 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000416 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
417 self._register_agent_for_ids(self._queue_entry_agents,
418 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000419
showard170873e2009-01-07 00:22:26 +0000420
421 def get_agents_for_entry(self, queue_entry):
422 """
423 Find agents corresponding to the specified queue_entry.
424 """
showardd3dc1992009-04-22 21:01:40 +0000425 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000426
427
428 def host_has_agent(self, host):
429 """
430 Determine if there is currently an Agent present using this host.
431 """
432 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000433
434
jadmanski0afbb632008-06-06 21:10:57 +0000435 def remove_agent(self, agent):
436 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000437 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
438 agent)
439 self._unregister_agent_for_ids(self._queue_entry_agents,
440 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000441
442
showard8cc058f2009-09-08 16:26:33 +0000443 def _host_has_scheduled_special_task(self, host):
444 return bool(models.SpecialTask.objects.filter(host__id=host.id,
445 is_active=False,
446 is_complete=False))
447
448
jadmanski0afbb632008-06-06 21:10:57 +0000449 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000450 agent_tasks = self._create_recovery_agent_tasks()
451 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000452 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000453 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000454 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000455 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000456 self._reverify_remaining_hosts()
457 # reinitialize drones after killing orphaned processes, since they can
458 # leave around files when they die
459 _drone_manager.execute_actions()
460 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000461
showard170873e2009-01-07 00:22:26 +0000462
showardd1195652009-12-08 22:21:02 +0000463 def _create_recovery_agent_tasks(self):
464 return (self._get_queue_entry_agent_tasks()
465 + self._get_special_task_agent_tasks(is_active=True))
466
467
468 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700469 """
470 Get agent tasks for all hqe in the specified states.
471
472 Loosely this translates to taking a hqe in one of the specified states,
473 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
474 through _get_agent_task_for_queue_entry. Each queue entry can only have
475 one agent task at a time, but there might be multiple queue entries in
476 the group.
477
478 @return: A list of AgentTasks.
479 """
showardd1195652009-12-08 22:21:02 +0000480 # host queue entry statuses handled directly by AgentTasks (Verifying is
481 # handled through SpecialTasks, so is not listed here)
482 statuses = (models.HostQueueEntry.Status.STARTING,
483 models.HostQueueEntry.Status.RUNNING,
484 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000485 models.HostQueueEntry.Status.PARSING,
486 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000487 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000488 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000489 where='status IN (%s)' % status_list)
Gabe Black1e1c41b2015-02-04 23:55:15 -0800490 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Alex Miller47cd2472013-11-25 15:20:04 -0800491 'running', len(queue_entries))
showardd1195652009-12-08 22:21:02 +0000492
493 agent_tasks = []
494 used_queue_entries = set()
495 for entry in queue_entries:
496 if self.get_agents_for_entry(entry):
497 # already being handled
498 continue
499 if entry in used_queue_entries:
500 # already picked up by a synchronous job
501 continue
502 agent_task = self._get_agent_task_for_queue_entry(entry)
503 agent_tasks.append(agent_task)
504 used_queue_entries.update(agent_task.queue_entries)
505 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000506
507
showardd1195652009-12-08 22:21:02 +0000508 def _get_special_task_agent_tasks(self, is_active=False):
509 special_tasks = models.SpecialTask.objects.filter(
510 is_active=is_active, is_complete=False)
511 return [self._get_agent_task_for_special_task(task)
512 for task in special_tasks]
513
514
515 def _get_agent_task_for_queue_entry(self, queue_entry):
516 """
beeps8bb1f7d2013-08-05 01:30:09 -0700517 Construct an AgentTask instance for the given active HostQueueEntry.
518
showardd1195652009-12-08 22:21:02 +0000519 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700520 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000521 """
522 task_entries = queue_entry.job.get_group_entries(queue_entry)
523 self._check_for_duplicate_host_entries(task_entries)
524
525 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
526 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000527 if queue_entry.is_hostless():
528 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000529 return QueueTask(queue_entries=task_entries)
530 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700531 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000532 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700533 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000534 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700535 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000536
Prashanth B0e960282014-05-13 19:38:28 -0700537 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800538 '_get_agent_task_for_queue_entry got entry with '
539 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000540
541
542 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000543 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
544 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000545 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000546 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000547 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000548 if using_host:
showardd1195652009-12-08 22:21:02 +0000549 self._assert_host_has_no_agent(task_entry)
550
551
552 def _assert_host_has_no_agent(self, entry):
553 """
554 @param entry: a HostQueueEntry or a SpecialTask
555 """
556 if self.host_has_agent(entry.host):
557 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700558 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000559 'While scheduling %s, host %s already has a host agent %s'
560 % (entry, entry.host, agent.task))
561
562
563 def _get_agent_task_for_special_task(self, special_task):
564 """
565 Construct an AgentTask class to run the given SpecialTask and add it
566 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700567
MK Ryu35d661e2014-09-25 17:44:10 -0700568 A special task is created through schedule_special_tasks, but only if
beeps8bb1f7d2013-08-05 01:30:09 -0700569 the host doesn't already have an agent. This happens through
570 add_agent_task. All special agent tasks are given a host on creation,
571 and a Null hqe. To create a SpecialAgentTask object, you need a
572 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
573 object contains a hqe it's passed on to the special agent task, which
574 creates a HostQueueEntry and saves it as it's queue_entry.
575
showardd1195652009-12-08 22:21:02 +0000576 @param special_task: a models.SpecialTask instance
577 @returns an AgentTask to run this SpecialTask
578 """
579 self._assert_host_has_no_agent(special_task)
580
beeps5e2bb4a2013-10-28 11:26:45 -0700581 special_agent_task_classes = (prejob_task.CleanupTask,
582 prejob_task.VerifyTask,
583 prejob_task.RepairTask,
584 prejob_task.ResetTask,
585 prejob_task.ProvisionTask)
586
showardd1195652009-12-08 22:21:02 +0000587 for agent_task_class in special_agent_task_classes:
588 if agent_task_class.TASK_TYPE == special_task.task:
589 return agent_task_class(task=special_task)
590
Prashanth B0e960282014-05-13 19:38:28 -0700591 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800592 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000593
594
595 def _register_pidfiles(self, agent_tasks):
596 for agent_task in agent_tasks:
597 agent_task.register_necessary_pidfiles()
598
599
600 def _recover_tasks(self, agent_tasks):
601 orphans = _drone_manager.get_orphaned_autoserv_processes()
602
603 for agent_task in agent_tasks:
604 agent_task.recover()
605 if agent_task.monitor and agent_task.monitor.has_process():
606 orphans.discard(agent_task.monitor.get_process())
607 self.add_agent_task(agent_task)
608
609 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000610
611
showard8cc058f2009-09-08 16:26:33 +0000612 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000613 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
614 % status):
showard0db3d432009-10-12 20:29:15 +0000615 if entry.status == status and not self.get_agents_for_entry(entry):
616 # The status can change during iteration, e.g., if job.run()
617 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000618 yield entry
619
620
showard6878e8b2009-07-20 22:37:45 +0000621 def _check_for_remaining_orphan_processes(self, orphans):
622 if not orphans:
623 return
624 subject = 'Unrecovered orphan autoserv processes remain'
625 message = '\n'.join(str(process) for process in orphans)
626 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000627
628 die_on_orphans = global_config.global_config.get_config_value(
629 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
630
631 if die_on_orphans:
632 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000633
showard170873e2009-01-07 00:22:26 +0000634
showard8cc058f2009-09-08 16:26:33 +0000635 def _recover_pending_entries(self):
636 for entry in self._get_unassigned_entries(
637 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000638 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000639 entry.on_pending()
640
641
showardb8900452009-10-12 20:31:01 +0000642 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000643 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000644 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
645 unrecovered_hqes = []
646 for queue_entry in queue_entries:
647 special_tasks = models.SpecialTask.objects.filter(
648 task__in=(models.SpecialTask.Task.CLEANUP,
649 models.SpecialTask.Task.VERIFY),
650 queue_entry__id=queue_entry.id,
651 is_complete=False)
652 if special_tasks.count() == 0:
653 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000654
showardb8900452009-10-12 20:31:01 +0000655 if unrecovered_hqes:
656 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700657 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000658 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000659 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000660
661
showard65db3932009-10-28 19:54:35 +0000662 def _schedule_special_tasks(self):
663 """
664 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700665
666 Special tasks include PreJobTasks like verify, reset and cleanup.
667 They are created through _schedule_new_jobs and associated with a hqe
668 This method translates SpecialTasks to the appropriate AgentTask and
669 adds them to the dispatchers agents list, so _handle_agents can execute
670 them.
showard65db3932009-10-28 19:54:35 +0000671 """
Prashanth B4ec98672014-05-15 10:44:54 -0700672 # When the host scheduler is responsible for acquisition we only want
673 # to run tasks with leased hosts. All hqe tasks will already have
674 # leased hosts, and we don't want to run frontend tasks till the host
675 # scheduler has vetted the assignment. Note that this doesn't include
676 # frontend tasks with hosts leased by other active hqes.
677 for task in self._job_query_manager.get_prioritized_special_tasks(
678 only_tasks_with_leased_hosts=not _inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000679 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000680 continue
showardd1195652009-12-08 22:21:02 +0000681 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000682
683
showard170873e2009-01-07 00:22:26 +0000684 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000685 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000686 # should never happen
showarded2afea2009-07-07 20:54:07 +0000687 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000688 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000689 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700690 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000691 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000692
693
jadmanski0afbb632008-06-06 21:10:57 +0000694 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000695 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700696 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000697 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000698 if self.host_has_agent(host):
699 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000700 continue
showard8cc058f2009-09-08 16:26:33 +0000701 if self._host_has_scheduled_special_task(host):
702 # host will have a special task scheduled on the next cycle
703 continue
showard170873e2009-01-07 00:22:26 +0000704 if print_message:
showardb18134f2009-03-20 20:52:18 +0000705 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000706 models.SpecialTask.objects.create(
707 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000708 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000709
710
jadmanski0afbb632008-06-06 21:10:57 +0000711 def _recover_hosts(self):
712 # recover "Repair Failed" hosts
713 message = 'Reverifying dead host %s'
714 self._reverify_hosts_where("status = 'Repair Failed'",
715 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000716
717
showard89f84db2009-03-12 20:39:13 +0000718 def _refresh_pending_queue_entries(self):
719 """
720 Lookup the pending HostQueueEntries and call our HostScheduler
721 refresh() method given that list. Return the list.
722
723 @returns A list of pending HostQueueEntries sorted in priority order.
724 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700725 queue_entries = self._job_query_manager.get_pending_queue_entries(
726 only_hostless=not _inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000727 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000728 return []
showard89f84db2009-03-12 20:39:13 +0000729 return queue_entries
730
731
showarda9545c02009-12-18 22:44:26 +0000732 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800733 """Schedule a hostless (suite) job.
734
735 @param queue_entry: The queue_entry representing the hostless job.
736 """
showarda9545c02009-12-18 22:44:26 +0000737 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700738
739 # Need to set execution_subdir before setting the status:
740 # After a restart of the scheduler, agents will be restored for HQEs in
741 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
742 # execution_subdir is needed. Therefore it must be set before entering
743 # one of these states.
744 # Otherwise, if the scheduler was interrupted between setting the status
745 # and the execution_subdir, upon it's restart restoring agents would
746 # fail.
747 # Is there a way to get a status in one of these states without going
748 # through this code? Following cases are possible:
749 # - If it's aborted before being started:
750 # active bit will be 0, so there's nothing to parse, it will just be
751 # set to completed by _find_aborting. Critical statuses are skipped.
752 # - If it's aborted or it fails after being started:
753 # It was started, so this code was executed.
754 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000755 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000756
757
beepscc9fc702013-12-02 12:45:38 -0800758 def _schedule_host_job(self, host, queue_entry):
759 """Schedules a job on the given host.
760
761 1. Assign the host to the hqe, if it isn't already assigned.
762 2. Create a SpecialAgentTask for the hqe.
763 3. Activate the hqe.
764
765 @param queue_entry: The job to schedule.
766 @param host: The host to schedule the job on.
767 """
768 if self.host_has_agent(host):
769 host_agent_task = list(self._host_agents.get(host.id))[0].task
770 subject = 'Host with agents assigned to an HQE'
771 message = ('HQE: %s assigned host %s, but the host has '
772 'agent: %s for queue_entry %s. The HQE '
beepsf7d35162014-02-13 16:42:13 -0800773 'will have to try and acquire a host next tick ' %
beepscc9fc702013-12-02 12:45:38 -0800774 (queue_entry, host.hostname, host_agent_task,
775 host_agent_task.queue_entry))
776 email_manager.manager.enqueue_notify_email(subject, message)
beepscc9fc702013-12-02 12:45:38 -0800777 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700778 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800779
780
showard89f84db2009-03-12 20:39:13 +0000781 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700782 """
783 Find any new HQEs and call schedule_pre_job_tasks for it.
784
785 This involves setting the status of the HQE and creating a row in the
786 db corresponding the the special task, through
787 scheduler_models._queue_special_task. The new db row is then added as
788 an agent to the dispatcher through _schedule_special_tasks and
789 scheduled for execution on the drone through _handle_agents.
790 """
showard89f84db2009-03-12 20:39:13 +0000791 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000792
beepscc9fc702013-12-02 12:45:38 -0800793 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700794 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700795 new_jobs_with_hosts = 0
796 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800797 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700798 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000799
beepscc9fc702013-12-02 12:45:38 -0800800 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000801 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000802 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700803 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000804 else:
beepscc9fc702013-12-02 12:45:38 -0800805 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700806 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700807
Gabe Black1e1c41b2015-02-04 23:55:15 -0800808 autotest_stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800809 if not host_jobs:
810 return
Prashanth Bf66d51b2014-05-06 12:42:25 -0700811 if not _inline_host_acquisition:
812 message = ('Found %s jobs that need hosts though '
813 '_inline_host_acquisition=%s. Will acquire hosts.' %
814 ([str(job) for job in host_jobs],
815 _inline_host_acquisition))
816 email_manager.manager.enqueue_notify_email(
817 'Processing unexpected host acquisition requests', message)
818 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
819 for host_assignment in jobs_with_hosts:
820 self._schedule_host_job(host_assignment.host, host_assignment.job)
821 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800822
Gabe Black1e1c41b2015-02-04 23:55:15 -0800823 autotest_stats.Gauge(key).send('new_jobs_with_hosts',
824 new_jobs_with_hosts)
825 autotest_stats.Gauge(key).send('new_jobs_without_hosts',
826 new_jobs_need_hosts -
827 new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000828
829
showard8cc058f2009-09-08 16:26:33 +0000830 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700831 """
832 Adds agents to the dispatcher.
833
834 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
835 QueueTask for example, will have a job with a control file, and
836 the agent will have methods that poll, abort and check if the queue
837 task is finished. The dispatcher runs the agent_task, as well as
838 other agents in it's _agents member, through _handle_agents, by
839 calling the Agents tick().
840
841 This method creates an agent for each HQE in one of (starting, running,
842 gathering, parsing, archiving) states, and adds it to the dispatcher so
843 it is handled by _handle_agents.
844 """
showardd1195652009-12-08 22:21:02 +0000845 for agent_task in self._get_queue_entry_agent_tasks():
846 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000847
848
849 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000850 for entry in scheduler_models.HostQueueEntry.fetch(
851 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000852 task = entry.job.schedule_delayed_callback_task(entry)
853 if task:
showardd1195652009-12-08 22:21:02 +0000854 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000855
856
jadmanski0afbb632008-06-06 21:10:57 +0000857 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700858 """
859 Looks through the afe_host_queue_entries for an aborted entry.
860
861 The aborted bit is set on an HQE in many ways, the most common
862 being when a user requests an abort through the frontend, which
863 results in an rpc from the afe to abort_host_queue_entries.
864 """
jamesrene7c65cb2010-06-08 20:38:10 +0000865 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000866 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700867 where='aborted=1 and complete=0'):
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800868
869 # If the job is running on a shard, let the shard handle aborting
870 # it and sync back the right status.
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800871 if entry.job.shard_id is not None and not server_utils.is_shard():
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800872 logging.info('Waiting for shard %s to abort hqe %s',
873 entry.job.shard_id, entry)
874 continue
875
showardf4a2e502009-07-28 20:06:39 +0000876 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800877
878 # The task would have started off with both is_complete and
879 # is_active = False. Aborted tasks are neither active nor complete.
880 # For all currently active tasks this will happen through the agent,
881 # but we need to manually update the special tasks that haven't
882 # started yet, because they don't have agents.
883 models.SpecialTask.objects.filter(is_active=False,
884 queue_entry_id=entry.id).update(is_complete=True)
885
showardd3dc1992009-04-22 21:01:40 +0000886 for agent in self.get_agents_for_entry(entry):
887 agent.abort()
888 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000889 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700890 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000891 for job in jobs_to_stop:
892 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000893
894
beeps8bb1f7d2013-08-05 01:30:09 -0700895 def _find_aborted_special_tasks(self):
896 """
897 Find SpecialTasks that have been marked for abortion.
898
899 Poll the database looking for SpecialTasks that are active
900 and have been marked for abortion, then abort them.
901 """
902
903 # The completed and active bits are very important when it comes
904 # to scheduler correctness. The active bit is set through the prolog
905 # of a special task, and reset through the cleanup method of the
906 # SpecialAgentTask. The cleanup is called both through the abort and
907 # epilog. The complete bit is set in several places, and in general
908 # a hanging job will have is_active=1 is_complete=0, while a special
909 # task which completed will have is_active=0 is_complete=1. To check
910 # aborts we directly check active because the complete bit is set in
911 # several places, including the epilog of agent tasks.
912 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
913 is_aborted=True)
914 for task in aborted_tasks:
915 # There are 2 ways to get the agent associated with a task,
916 # through the host and through the hqe. A special task
917 # always needs a host, but doesn't always need a hqe.
918 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700919 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000920
beeps8bb1f7d2013-08-05 01:30:09 -0700921 # The epilog preforms critical actions such as
922 # queueing the next SpecialTask, requeuing the
923 # hqe etc, however it doesn't actually kill the
924 # monitor process and set the 'done' bit. Epilogs
925 # assume that the job failed, and that the monitor
926 # process has already written an exit code. The
927 # done bit is a necessary condition for
928 # _handle_agents to schedule any more special
929 # tasks against the host, and it must be set
930 # in addition to is_active, is_complete and success.
931 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000932 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700933
934
showard324bf812009-01-20 23:23:38 +0000935 def _can_start_agent(self, agent, num_started_this_cycle,
936 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000937 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000938 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000939 return True
940 # don't allow any nonzero-process agents to run after we've reached a
941 # limit (this avoids starvation of many-process agents)
942 if have_reached_limit:
943 return False
944 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000945 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000946 agent.task.owner_username,
947 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000948 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000949 return False
950 # if a single agent exceeds the per-cycle throttling, still allow it to
951 # run when it's the first agent in the cycle
952 if num_started_this_cycle == 0:
953 return True
954 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000955 if (num_started_this_cycle + agent.task.num_processes >
956 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000957 return False
958 return True
959
960
jadmanski0afbb632008-06-06 21:10:57 +0000961 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700962 """
963 Handles agents of the dispatcher.
964
965 Appropriate Agents are added to the dispatcher through
966 _schedule_running_host_queue_entries. These agents each
967 have a task. This method runs the agents task through
968 agent.tick() leading to:
969 agent.start
970 prolog -> AgentTasks prolog
971 For each queue entry:
972 sets host status/status to Running
973 set started_on in afe_host_queue_entries
974 run -> AgentTasks run
975 Creates PidfileRunMonitor
976 Queues the autoserv command line for this AgentTask
977 via the drone manager. These commands are executed
978 through the drone managers execute actions.
979 poll -> AgentTasks/BaseAgentTask poll
980 checks the monitors exit_code.
981 Executes epilog if task is finished.
982 Executes AgentTasks _finish_task
983 finish_task is usually responsible for setting the status
984 of the HQE/host, and updating it's active and complete fileds.
985
986 agent.is_done
987 Removed the agent from the dispatchers _agents queue.
988 Is_done checks the finished bit on the agent, that is
989 set based on the Agents task. During the agents poll
990 we check to see if the monitor process has exited in
991 it's finish method, and set the success member of the
992 task based on this exit code.
993 """
jadmanski0afbb632008-06-06 21:10:57 +0000994 num_started_this_cycle = 0
Prashanth Balasubramanian628bfcf2014-10-02 12:44:13 -0700995 num_finished_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000996 have_reached_limit = False
997 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700998 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000999 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001000 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1001 'queue_entry ids:%s' % (agent.host_ids,
1002 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001003 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001004 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001005 have_reached_limit):
1006 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001007 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001008 continue
showardd1195652009-12-08 22:21:02 +00001009 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001010 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001011 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001012 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001013 if agent.is_done():
Prashanth Balasubramanian628bfcf2014-10-02 12:44:13 -07001014 num_finished_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001015 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001016 self.remove_agent(agent)
Gabe Black1e1c41b2015-02-04 23:55:15 -08001017 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Prashanth Balasubramanian628bfcf2014-10-02 12:44:13 -07001018 'agents_started', num_started_this_cycle)
Gabe Black1e1c41b2015-02-04 23:55:15 -08001019 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Prashanth Balasubramanian628bfcf2014-10-02 12:44:13 -07001020 'agents_finished', num_finished_this_cycle)
Simran Basi3f6717d2012-09-13 15:21:22 -07001021 logging.info('%d running processes. %d added this cycle.',
1022 _drone_manager.total_running_processes(),
1023 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +00001024
1025
showard29f7cd22009-04-29 21:16:24 +00001026 def _process_recurring_runs(self):
1027 recurring_runs = models.RecurringRun.objects.filter(
1028 start_date__lte=datetime.datetime.now())
1029 for rrun in recurring_runs:
1030 # Create job from template
1031 job = rrun.job
1032 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001033 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001034
1035 host_objects = info['hosts']
1036 one_time_hosts = info['one_time_hosts']
1037 metahost_objects = info['meta_hosts']
1038 dependencies = info['dependencies']
1039 atomic_group = info['atomic_group']
1040
1041 for host in one_time_hosts or []:
1042 this_host = models.Host.create_one_time_host(host.hostname)
1043 host_objects.append(this_host)
1044
1045 try:
1046 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001047 options=options,
showard29f7cd22009-04-29 21:16:24 +00001048 host_objects=host_objects,
1049 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001050 atomic_group=atomic_group)
1051
1052 except Exception, ex:
1053 logging.exception(ex)
1054 #TODO send email
1055
1056 if rrun.loop_count == 1:
1057 rrun.delete()
1058 else:
1059 if rrun.loop_count != 0: # if not infinite loop
1060 # calculate new start_date
1061 difference = datetime.timedelta(seconds=rrun.loop_period)
1062 rrun.start_date = rrun.start_date + difference
1063 rrun.loop_count -= 1
1064 rrun.save()
1065
1066
Simran Basia858a232012-08-21 11:04:37 -07001067SiteDispatcher = utils.import_site_class(
1068 __file__, 'autotest_lib.scheduler.site_monitor_db',
1069 'SiteDispatcher', BaseDispatcher)
1070
1071class Dispatcher(SiteDispatcher):
1072 pass
1073
1074
mbligh36768f02008-02-22 18:28:33 +00001075class Agent(object):
showard77182562009-06-10 00:16:05 +00001076 """
Alex Miller47715eb2013-07-24 03:34:01 -07001077 An agent for use by the Dispatcher class to perform a task. An agent wraps
1078 around an AgentTask mainly to associate the AgentTask with the queue_entry
1079 and host ids.
showard77182562009-06-10 00:16:05 +00001080
1081 The following methods are required on all task objects:
1082 poll() - Called periodically to let the task check its status and
1083 update its internal state. If the task succeeded.
1084 is_done() - Returns True if the task is finished.
1085 abort() - Called when an abort has been requested. The task must
1086 set its aborted attribute to True if it actually aborted.
1087
1088 The following attributes are required on all task objects:
1089 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001090 success - bool, True if this task succeeded.
1091 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1092 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001093 """
1094
1095
showard418785b2009-11-23 20:19:59 +00001096 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001097 """
Alex Miller47715eb2013-07-24 03:34:01 -07001098 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001099 """
showard8cc058f2009-09-08 16:26:33 +00001100 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001101
showard77182562009-06-10 00:16:05 +00001102 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001103 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001104
showard8cc058f2009-09-08 16:26:33 +00001105 self.queue_entry_ids = task.queue_entry_ids
1106 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001107
showard8cc058f2009-09-08 16:26:33 +00001108 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001109 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001110
1111
jadmanski0afbb632008-06-06 21:10:57 +00001112 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001113 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001114 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001115 self.task.poll()
1116 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001117 self.finished = True
showardec113162008-05-08 00:52:49 +00001118
1119
jadmanski0afbb632008-06-06 21:10:57 +00001120 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001121 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001122
1123
showardd3dc1992009-04-22 21:01:40 +00001124 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001125 if self.task:
1126 self.task.abort()
1127 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001128 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001129 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001130
showardd3dc1992009-04-22 21:01:40 +00001131
beeps5e2bb4a2013-10-28 11:26:45 -07001132class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001133 """
1134 Common functionality for QueueTask and HostlessQueueTask
1135 """
1136 def __init__(self, queue_entries):
1137 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001138 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001139 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001140
1141
showard73ec0442009-02-07 02:05:20 +00001142 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001143 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001144
1145
jamesrenc44ae992010-02-19 00:12:54 +00001146 def _write_control_file(self, execution_path):
1147 control_path = _drone_manager.attach_file_to_execution(
1148 execution_path, self.job.control_file)
1149 return control_path
1150
1151
Aviv Keshet308e7362013-05-21 14:43:16 -07001152 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001153 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001154 execution_path = self.queue_entries[0].execution_path()
1155 control_path = self._write_control_file(execution_path)
1156 hostnames = ','.join(entry.host.hostname
1157 for entry in self.queue_entries
1158 if not entry.is_hostless())
1159
1160 execution_tag = self.queue_entries[0].execution_tag()
1161 params = _autoserv_command_line(
1162 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001163 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001164 _drone_manager.absolute_path(control_path)],
1165 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001166 if self.job.is_image_update_job():
1167 params += ['--image', self.job.update_image_path]
1168
jamesrenc44ae992010-02-19 00:12:54 +00001169 return params
showardd1195652009-12-08 22:21:02 +00001170
1171
1172 @property
1173 def num_processes(self):
1174 return len(self.queue_entries)
1175
1176
1177 @property
1178 def owner_username(self):
1179 return self.job.owner
1180
1181
1182 def _working_directory(self):
1183 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001184
1185
jadmanski0afbb632008-06-06 21:10:57 +00001186 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001187 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001188 keyval_dict = self.job.keyval_dict()
1189 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001190 group_name = self.queue_entries[0].get_group_name()
1191 if group_name:
1192 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001193 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001194 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001195 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001196 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001197
1198
showard35162b02009-03-03 02:17:30 +00001199 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001200 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001201 _drone_manager.write_lines_to_file(error_file_path,
1202 [_LOST_PROCESS_ERROR])
1203
1204
showardd3dc1992009-04-22 21:01:40 +00001205 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001206 if not self.monitor:
1207 return
1208
showardd9205182009-04-27 20:09:55 +00001209 self._write_job_finished()
1210
showard35162b02009-03-03 02:17:30 +00001211 if self.monitor.lost_process:
1212 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001213
jadmanskif7fa2cc2008-10-01 14:13:23 +00001214
showardcbd74612008-11-19 21:42:02 +00001215 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001216 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001217 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001218 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001219 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001220
1221
jadmanskif7fa2cc2008-10-01 14:13:23 +00001222 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001223 if not self.monitor or not self.monitor.has_process():
1224 return
1225
jadmanskif7fa2cc2008-10-01 14:13:23 +00001226 # build up sets of all the aborted_by and aborted_on values
1227 aborted_by, aborted_on = set(), set()
1228 for queue_entry in self.queue_entries:
1229 if queue_entry.aborted_by:
1230 aborted_by.add(queue_entry.aborted_by)
1231 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1232 aborted_on.add(t)
1233
1234 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001235 # TODO(showard): this conditional is now obsolete, we just need to leave
1236 # it in temporarily for backwards compatibility over upgrades. delete
1237 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001238 assert len(aborted_by) <= 1
1239 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001240 aborted_by_value = aborted_by.pop()
1241 aborted_on_value = max(aborted_on)
1242 else:
1243 aborted_by_value = 'autotest_system'
1244 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001245
showarda0382352009-02-11 23:36:43 +00001246 self._write_keyval_after_job("aborted_by", aborted_by_value)
1247 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001248
showardcbd74612008-11-19 21:42:02 +00001249 aborted_on_string = str(datetime.datetime.fromtimestamp(
1250 aborted_on_value))
1251 self._write_status_comment('Job aborted by %s on %s' %
1252 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001253
1254
jadmanski0afbb632008-06-06 21:10:57 +00001255 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001256 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001257 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001258 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001259
1260
jadmanski0afbb632008-06-06 21:10:57 +00001261 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001262 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001263 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001264
1265
1266class QueueTask(AbstractQueueTask):
1267 def __init__(self, queue_entries):
1268 super(QueueTask, self).__init__(queue_entries)
1269 self._set_ids(queue_entries=queue_entries)
1270
1271
1272 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001273 self._check_queue_entry_statuses(
1274 self.queue_entries,
1275 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1276 models.HostQueueEntry.Status.RUNNING),
1277 allowed_host_statuses=(models.Host.Status.PENDING,
1278 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001279
1280 super(QueueTask, self).prolog()
1281
1282 for queue_entry in self.queue_entries:
1283 self._write_host_keyvals(queue_entry.host)
1284 queue_entry.host.set_status(models.Host.Status.RUNNING)
1285 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001286
1287
1288 def _finish_task(self):
1289 super(QueueTask, self)._finish_task()
1290
1291 for queue_entry in self.queue_entries:
1292 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001293 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001294
1295
Alex Miller9f01d5d2013-08-08 02:26:01 -07001296 def _command_line(self):
1297 invocation = super(QueueTask, self)._command_line()
Dan Shiec1d47d2015-02-13 11:38:13 -08001298 # Check if server-side packaging is needed.
1299 if (_enable_ssp_container and
1300 self.job.control_type == control_data.CONTROL_TYPE.SERVER and
1301 self.job.require_ssp != False):
1302 invocation += ['--require-ssp']
Alex Miller9f01d5d2013-08-08 02:26:01 -07001303 return invocation + ['--verify_job_repo_url']
1304
1305
Dan Shi1a189052013-10-28 14:41:35 -07001306class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001307 def __init__(self, queue_entry):
1308 super(HostlessQueueTask, self).__init__([queue_entry])
1309 self.queue_entry_ids = [queue_entry.id]
1310
1311
1312 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001313 super(HostlessQueueTask, self).prolog()
1314
1315
mbligh4608b002010-01-05 18:22:35 +00001316 def _finish_task(self):
1317 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001318
1319 # When a job is added to database, its initial status is always
1320 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1321 # status, check if any of them can be started. If scheduler hits some
Alex Millerac189f32014-06-23 13:55:23 -07001322 # limit, e.g., max_hostless_jobs_per_drone,
1323 # max_processes_started_per_cycle, scheduler will leave these jobs in
1324 # Starting status. Otherwise, the jobs' status will be changed to
1325 # Running, and an autoserv process will be started in drone for each of
1326 # these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001327 # If the entry is still in status Starting, the process has not started
1328 # yet. Therefore, there is no need to parse and collect log. Without
1329 # this check, exception will be raised by scheduler as execution_subdir
1330 # for this queue entry does not have a value yet.
1331 hqe = self.queue_entries[0]
1332 if hqe.status != models.HostQueueEntry.Status.STARTING:
1333 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001334
1335
mbligh36768f02008-02-22 18:28:33 +00001336if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001337 main()