blob: 59af93757c1fb6729a7aa299f37d74d6b825bf66 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
Dan Shif6c65bd2014-08-29 16:15:07 -07008import datetime
9import gc
10import logging
11import optparse
12import os
13import signal
14import sys
15import time
showard402934a2009-12-21 22:20:47 +000016
Alex Miller05d7b4c2013-03-04 07:49:38 -080017import common
showard21baa452008-10-21 00:08:39 +000018from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000019
20import django.db
21
Prashanth B0e960282014-05-13 19:38:28 -070022from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070023from autotest_lib.client.common_lib import utils
Gabe Black1e1c41b2015-02-04 23:55:15 -080024from autotest_lib.client.common_lib.cros.graphite import autotest_stats
Prashanth B0e960282014-05-13 19:38:28 -070025from autotest_lib.frontend.afe import models, rpc_utils
Fang Dengc330bee2014-10-21 18:10:55 -070026from autotest_lib.scheduler import agent_task, drone_manager
beeps5e2bb4a2013-10-28 11:26:45 -070027from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
28from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070029from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070030from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070031from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000032from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080033from autotest_lib.scheduler import status_server, scheduler_config
Prashanth Bf66d51b2014-05-06 12:42:25 -070034from autotest_lib.scheduler import scheduler_lib
Aviv Keshet308e7362013-05-21 14:43:16 -070035from autotest_lib.server import autoserv_utils
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080036from autotest_lib.server import utils as server_utils
Dan Shib9144a42014-12-01 16:09:32 -080037from autotest_lib.site_utils import server_manager_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080038
showard549afad2009-08-20 23:33:36 +000039BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
40PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000041
mbligh36768f02008-02-22 18:28:33 +000042RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000043AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
44
45if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000046 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000047AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
48AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
49
50if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000051 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000052
showard35162b02009-03-03 02:17:30 +000053# error message to leave in results dir when an autoserv process disappears
54# mysteriously
55_LOST_PROCESS_ERROR = """\
56Autoserv failed abnormally during execution for this job, probably due to a
57system error on the Autotest server. Full results may not be available. Sorry.
58"""
59
Prashanth B0e960282014-05-13 19:38:28 -070060_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070061_db = None
mbligh36768f02008-02-22 18:28:33 +000062_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070063
64# These 2 globals are replaced for testing
65_autoserv_directory = autoserv_utils.autoserv_directory
66_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000067_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000068_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070069_inline_host_acquisition = global_config.global_config.get_config_value(
70 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
71 default=True)
72
mbligh36768f02008-02-22 18:28:33 +000073
mbligh83c1e9e2009-05-01 23:10:41 +000074def _site_init_monitor_db_dummy():
75 return {}
76
77
jamesren76fcf192010-04-21 20:39:50 +000078def _verify_default_drone_set_exists():
79 if (models.DroneSet.drone_sets_enabled() and
80 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070081 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080082 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000083
84
85def _sanity_check():
86 """Make sure the configs are consistent before starting the scheduler"""
87 _verify_default_drone_set_exists()
88
89
mbligh36768f02008-02-22 18:28:33 +000090def main():
showard27f33872009-04-07 18:20:53 +000091 try:
showard549afad2009-08-20 23:33:36 +000092 try:
93 main_without_exception_handling()
94 except SystemExit:
95 raise
96 except:
97 logging.exception('Exception escaping in monitor_db')
98 raise
99 finally:
100 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000101
102
103def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700104 scheduler_lib.setup_logging(
105 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
106 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000107 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000108 parser = optparse.OptionParser(usage)
109 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
110 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000111 parser.add_option('--test', help='Indicate that scheduler is under ' +
112 'test and should use dummy autoserv and no parsing',
113 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700114 parser.add_option('--production',
115 help=('Indicate that scheduler is running in production '
116 'environment and it can use database that is not '
117 'hosted in localhost. If it is set to False, '
118 'scheduler will fail if database is not in '
119 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700120 action='store_true', default=False)
jadmanski0afbb632008-06-06 21:10:57 +0000121 (options, args) = parser.parse_args()
122 if len(args) != 1:
123 parser.print_usage()
124 return
mbligh36768f02008-02-22 18:28:33 +0000125
Dan Shif6c65bd2014-08-29 16:15:07 -0700126 scheduler_lib.check_production_settings(options)
127
showard5613c662009-06-08 23:30:33 +0000128 scheduler_enabled = global_config.global_config.get_config_value(
129 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
130
131 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800132 logging.error("Scheduler not enabled, set enable_scheduler to true in "
133 "the global_config's SCHEDULER section to enable it. "
134 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000135 sys.exit(1)
136
jadmanski0afbb632008-06-06 21:10:57 +0000137 global RESULTS_DIR
138 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000139
mbligh83c1e9e2009-05-01 23:10:41 +0000140 site_init = utils.import_site_function(__file__,
141 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
142 _site_init_monitor_db_dummy)
143 site_init()
144
showardcca334f2009-03-12 20:38:34 +0000145 # Change the cwd while running to avoid issues incase we were launched from
146 # somewhere odd (such as a random NFS home directory of the person running
147 # sudo to launch us as the appropriate user).
148 os.chdir(RESULTS_DIR)
149
jamesrenc7d387e2010-08-10 21:48:30 +0000150 # This is helpful for debugging why stuff a scheduler launches is
151 # misbehaving.
152 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000153
jadmanski0afbb632008-06-06 21:10:57 +0000154 if options.test:
155 global _autoserv_path
156 _autoserv_path = 'autoserv_dummy'
157 global _testing_mode
158 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000159
jamesrenc44ae992010-02-19 00:12:54 +0000160 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000161 server.start()
162
jadmanski0afbb632008-06-06 21:10:57 +0000163 try:
jamesrenc44ae992010-02-19 00:12:54 +0000164 initialize()
showardc5afc462009-01-13 00:09:39 +0000165 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000166 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000167
Eric Lia82dc352011-02-23 13:15:52 -0800168 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000169 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000170 time.sleep(scheduler_config.config.tick_pause_sec)
Prashanth B4ec98672014-05-15 10:44:54 -0700171 except Exception:
showard170873e2009-01-07 00:22:26 +0000172 email_manager.manager.log_stacktrace(
173 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000174
showard170873e2009-01-07 00:22:26 +0000175 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000176 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000177 _drone_manager.shutdown()
Prashanth B0e960282014-05-13 19:38:28 -0700178 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000179
180
Prashanth B4ec98672014-05-15 10:44:54 -0700181def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000182 global _shutdown
183 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000184 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000185
186
jamesrenc44ae992010-02-19 00:12:54 +0000187def initialize():
showardb18134f2009-03-20 20:52:18 +0000188 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
189 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000190
showard8de37132009-08-31 18:33:08 +0000191 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000192 logging.critical("monitor_db already running, aborting!")
193 sys.exit(1)
194 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000195
showardb1e51872008-10-07 11:08:18 +0000196 if _testing_mode:
197 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700198 scheduler_lib.DB_CONFIG_SECTION, 'database',
199 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000200
Dan Shib9144a42014-12-01 16:09:32 -0800201 # If server database is enabled, check if the server has role `scheduler`.
202 # If the server does not have scheduler role, exception will be raised and
203 # scheduler will not continue to run.
204 if server_manager_utils.use_server_db():
205 server_manager_utils.confirm_server_has_role(hostname='localhost',
206 role='scheduler')
207
jadmanski0afbb632008-06-06 21:10:57 +0000208 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700209 global _db_manager
210 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700211 global _db
212 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000213 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700214 signal.signal(signal.SIGINT, handle_signal)
215 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000216
jamesrenc44ae992010-02-19 00:12:54 +0000217 initialize_globals()
218 scheduler_models.initialize()
219
Dan Shib9144a42014-12-01 16:09:32 -0800220 if server_manager_utils.use_server_db():
221 drone_list = server_manager_utils.get_drones()
222 else:
223 drones = global_config.global_config.get_config_value(
224 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
225 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000226 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000227 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000228 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
229
showardb18134f2009-03-20 20:52:18 +0000230 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000231
232
jamesrenc44ae992010-02-19 00:12:54 +0000233def initialize_globals():
234 global _drone_manager
235 _drone_manager = drone_manager.instance()
236
237
showarded2afea2009-07-07 20:54:07 +0000238def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
239 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000240 """
241 @returns The autoserv command line as a list of executable + parameters.
242
243 @param machines - string - A machine or comma separated list of machines
244 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000245 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700246 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
247 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000248 @param queue_entry - A HostQueueEntry object - If supplied and no Job
249 object was supplied, this will be used to lookup the Job object.
250 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700251 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
252 machines, results_directory=drone_manager.WORKING_DIRECTORY,
253 extra_args=extra_args, job=job, queue_entry=queue_entry,
254 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000255
256
Simran Basia858a232012-08-21 11:04:37 -0700257class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800258
259
jadmanski0afbb632008-06-06 21:10:57 +0000260 def __init__(self):
261 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000262 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700263 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000264 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700265 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700266 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Jakob Jülich36accc62014-07-23 10:26:55 -0700267 _db)
showard170873e2009-01-07 00:22:26 +0000268 self._host_agents = {}
269 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000270 self._tick_count = 0
271 self._last_garbage_stats_time = time.time()
272 self._seconds_between_garbage_stats = 60 * (
273 global_config.global_config.get_config_value(
274 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700275 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700276 self._tick_debug = global_config.global_config.get_config_value(
277 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
278 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700279 self._extra_debugging = global_config.global_config.get_config_value(
280 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
281 default=False)
mbligh36768f02008-02-22 18:28:33 +0000282
Prashanth Bf66d51b2014-05-06 12:42:25 -0700283 # If _inline_host_acquisition is set the scheduler will acquire and
284 # release hosts against jobs inline, with the tick. Otherwise the
285 # scheduler will only focus on jobs that already have hosts, and
286 # will not explicitly unlease a host when a job finishes using it.
287 self._job_query_manager = query_managers.AFEJobQueryManager()
288 self._host_scheduler = (host_scheduler.BaseHostScheduler()
289 if _inline_host_acquisition else
290 host_scheduler.DummyHostScheduler())
291
mbligh36768f02008-02-22 18:28:33 +0000292
showard915958d2009-04-22 21:00:58 +0000293 def initialize(self, recover_hosts=True):
294 self._periodic_cleanup.initialize()
295 self._24hr_upkeep.initialize()
296
jadmanski0afbb632008-06-06 21:10:57 +0000297 # always recover processes
298 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000299
jadmanski0afbb632008-06-06 21:10:57 +0000300 if recover_hosts:
301 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000302
303
Simran Basi0ec94dd2012-08-28 09:50:10 -0700304 def _log_tick_msg(self, msg):
305 if self._tick_debug:
306 logging.debug(msg)
307
308
Simran Basidef92872012-09-20 13:34:34 -0700309 def _log_extra_msg(self, msg):
310 if self._extra_debugging:
311 logging.debug(msg)
312
313
jadmanski0afbb632008-06-06 21:10:57 +0000314 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700315 """
316 This is an altered version of tick() where we keep track of when each
317 major step begins so we can try to figure out where we are using most
318 of the tick time.
319 """
Gabe Black1e1c41b2015-02-04 23:55:15 -0800320 timer = autotest_stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700321 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000322 self._garbage_collection()
Prashanth B340fd1e2014-06-22 12:44:10 -0700323 self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
324 _drone_manager.trigger_refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700325 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000326 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700327 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000328 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700329 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000330 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700331 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000332 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700333 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000334 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700335 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000336 self._schedule_new_jobs()
Prashanth B340fd1e2014-06-22 12:44:10 -0700337 self._log_tick_msg('Calling _drone_manager.sync_refresh().')
338 _drone_manager.sync_refresh()
Prashanth B67548092014-07-11 18:46:01 -0700339 self._log_tick_msg('Calling _find_aborting().')
340 self._find_aborting()
341 self._log_tick_msg('Calling _find_aborted_special_tasks().')
342 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700343 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000344 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700345 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000346 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700347 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000348 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700349 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700350 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700351 with timer.get_client('email_manager_send_queued_emails'):
352 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700353 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700354 with timer.get_client('django_db_reset_queries'):
355 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000356 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000357
showard97aed502008-11-04 02:01:24 +0000358
mblighf3294cc2009-04-08 21:17:38 +0000359 def _run_cleanup(self):
360 self._periodic_cleanup.run_cleanup_maybe()
361 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000362
mbligh36768f02008-02-22 18:28:33 +0000363
showardf13a9e22009-12-18 22:54:09 +0000364 def _garbage_collection(self):
365 threshold_time = time.time() - self._seconds_between_garbage_stats
366 if threshold_time < self._last_garbage_stats_time:
367 # Don't generate these reports very often.
368 return
369
370 self._last_garbage_stats_time = time.time()
371 # Force a full level 0 collection (because we can, it doesn't hurt
372 # at this interval).
373 gc.collect()
374 logging.info('Logging garbage collector stats on tick %d.',
375 self._tick_count)
376 gc_stats._log_garbage_collector_stats()
377
378
showard170873e2009-01-07 00:22:26 +0000379 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
380 for object_id in object_ids:
381 agent_dict.setdefault(object_id, set()).add(agent)
382
383
384 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
385 for object_id in object_ids:
386 assert object_id in agent_dict
387 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700388 # If an ID has no more active agent associated, there is no need to
389 # keep it in the dictionary. Otherwise, scheduler will keep an
390 # unnecessarily big dictionary until being restarted.
391 if not agent_dict[object_id]:
392 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000393
394
showardd1195652009-12-08 22:21:02 +0000395 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700396 """
397 Creates and adds an agent to the dispatchers list.
398
399 In creating the agent we also pass on all the queue_entry_ids and
400 host_ids from the special agent task. For every agent we create, we
401 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
402 against the host_ids given to it. So theoritically, a host can have any
403 number of agents associated with it, and each of them can have any
404 special agent task, though in practice we never see > 1 agent/task per
405 host at any time.
406
407 @param agent_task: A SpecialTask for the agent to manage.
408 """
showardd1195652009-12-08 22:21:02 +0000409 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000410 self._agents.append(agent)
411 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000412 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
413 self._register_agent_for_ids(self._queue_entry_agents,
414 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000415
showard170873e2009-01-07 00:22:26 +0000416
417 def get_agents_for_entry(self, queue_entry):
418 """
419 Find agents corresponding to the specified queue_entry.
420 """
showardd3dc1992009-04-22 21:01:40 +0000421 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000422
423
424 def host_has_agent(self, host):
425 """
426 Determine if there is currently an Agent present using this host.
427 """
428 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000429
430
jadmanski0afbb632008-06-06 21:10:57 +0000431 def remove_agent(self, agent):
432 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000433 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
434 agent)
435 self._unregister_agent_for_ids(self._queue_entry_agents,
436 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000437
438
showard8cc058f2009-09-08 16:26:33 +0000439 def _host_has_scheduled_special_task(self, host):
440 return bool(models.SpecialTask.objects.filter(host__id=host.id,
441 is_active=False,
442 is_complete=False))
443
444
jadmanski0afbb632008-06-06 21:10:57 +0000445 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000446 agent_tasks = self._create_recovery_agent_tasks()
447 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000448 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000449 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000450 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000451 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000452 self._reverify_remaining_hosts()
453 # reinitialize drones after killing orphaned processes, since they can
454 # leave around files when they die
455 _drone_manager.execute_actions()
456 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000457
showard170873e2009-01-07 00:22:26 +0000458
showardd1195652009-12-08 22:21:02 +0000459 def _create_recovery_agent_tasks(self):
460 return (self._get_queue_entry_agent_tasks()
461 + self._get_special_task_agent_tasks(is_active=True))
462
463
464 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700465 """
466 Get agent tasks for all hqe in the specified states.
467
468 Loosely this translates to taking a hqe in one of the specified states,
469 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
470 through _get_agent_task_for_queue_entry. Each queue entry can only have
471 one agent task at a time, but there might be multiple queue entries in
472 the group.
473
474 @return: A list of AgentTasks.
475 """
showardd1195652009-12-08 22:21:02 +0000476 # host queue entry statuses handled directly by AgentTasks (Verifying is
477 # handled through SpecialTasks, so is not listed here)
478 statuses = (models.HostQueueEntry.Status.STARTING,
479 models.HostQueueEntry.Status.RUNNING,
480 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000481 models.HostQueueEntry.Status.PARSING,
482 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000483 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000484 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000485 where='status IN (%s)' % status_list)
Gabe Black1e1c41b2015-02-04 23:55:15 -0800486 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Alex Miller47cd2472013-11-25 15:20:04 -0800487 'running', len(queue_entries))
showardd1195652009-12-08 22:21:02 +0000488
489 agent_tasks = []
490 used_queue_entries = set()
491 for entry in queue_entries:
492 if self.get_agents_for_entry(entry):
493 # already being handled
494 continue
495 if entry in used_queue_entries:
496 # already picked up by a synchronous job
497 continue
498 agent_task = self._get_agent_task_for_queue_entry(entry)
499 agent_tasks.append(agent_task)
500 used_queue_entries.update(agent_task.queue_entries)
501 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000502
503
showardd1195652009-12-08 22:21:02 +0000504 def _get_special_task_agent_tasks(self, is_active=False):
505 special_tasks = models.SpecialTask.objects.filter(
506 is_active=is_active, is_complete=False)
507 return [self._get_agent_task_for_special_task(task)
508 for task in special_tasks]
509
510
511 def _get_agent_task_for_queue_entry(self, queue_entry):
512 """
beeps8bb1f7d2013-08-05 01:30:09 -0700513 Construct an AgentTask instance for the given active HostQueueEntry.
514
showardd1195652009-12-08 22:21:02 +0000515 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700516 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000517 """
518 task_entries = queue_entry.job.get_group_entries(queue_entry)
519 self._check_for_duplicate_host_entries(task_entries)
520
521 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
522 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000523 if queue_entry.is_hostless():
524 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000525 return QueueTask(queue_entries=task_entries)
526 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700527 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000528 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700529 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000530 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700531 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000532
Prashanth B0e960282014-05-13 19:38:28 -0700533 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800534 '_get_agent_task_for_queue_entry got entry with '
535 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000536
537
538 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000539 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
540 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000541 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000542 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000543 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000544 if using_host:
showardd1195652009-12-08 22:21:02 +0000545 self._assert_host_has_no_agent(task_entry)
546
547
548 def _assert_host_has_no_agent(self, entry):
549 """
550 @param entry: a HostQueueEntry or a SpecialTask
551 """
552 if self.host_has_agent(entry.host):
553 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700554 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000555 'While scheduling %s, host %s already has a host agent %s'
556 % (entry, entry.host, agent.task))
557
558
559 def _get_agent_task_for_special_task(self, special_task):
560 """
561 Construct an AgentTask class to run the given SpecialTask and add it
562 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700563
MK Ryu35d661e2014-09-25 17:44:10 -0700564 A special task is created through schedule_special_tasks, but only if
beeps8bb1f7d2013-08-05 01:30:09 -0700565 the host doesn't already have an agent. This happens through
566 add_agent_task. All special agent tasks are given a host on creation,
567 and a Null hqe. To create a SpecialAgentTask object, you need a
568 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
569 object contains a hqe it's passed on to the special agent task, which
570 creates a HostQueueEntry and saves it as it's queue_entry.
571
showardd1195652009-12-08 22:21:02 +0000572 @param special_task: a models.SpecialTask instance
573 @returns an AgentTask to run this SpecialTask
574 """
575 self._assert_host_has_no_agent(special_task)
576
beeps5e2bb4a2013-10-28 11:26:45 -0700577 special_agent_task_classes = (prejob_task.CleanupTask,
578 prejob_task.VerifyTask,
579 prejob_task.RepairTask,
580 prejob_task.ResetTask,
581 prejob_task.ProvisionTask)
582
showardd1195652009-12-08 22:21:02 +0000583 for agent_task_class in special_agent_task_classes:
584 if agent_task_class.TASK_TYPE == special_task.task:
585 return agent_task_class(task=special_task)
586
Prashanth B0e960282014-05-13 19:38:28 -0700587 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800588 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000589
590
591 def _register_pidfiles(self, agent_tasks):
592 for agent_task in agent_tasks:
593 agent_task.register_necessary_pidfiles()
594
595
596 def _recover_tasks(self, agent_tasks):
597 orphans = _drone_manager.get_orphaned_autoserv_processes()
598
599 for agent_task in agent_tasks:
600 agent_task.recover()
601 if agent_task.monitor and agent_task.monitor.has_process():
602 orphans.discard(agent_task.monitor.get_process())
603 self.add_agent_task(agent_task)
604
605 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000606
607
showard8cc058f2009-09-08 16:26:33 +0000608 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000609 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
610 % status):
showard0db3d432009-10-12 20:29:15 +0000611 if entry.status == status and not self.get_agents_for_entry(entry):
612 # The status can change during iteration, e.g., if job.run()
613 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000614 yield entry
615
616
showard6878e8b2009-07-20 22:37:45 +0000617 def _check_for_remaining_orphan_processes(self, orphans):
618 if not orphans:
619 return
620 subject = 'Unrecovered orphan autoserv processes remain'
621 message = '\n'.join(str(process) for process in orphans)
622 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000623
624 die_on_orphans = global_config.global_config.get_config_value(
625 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
626
627 if die_on_orphans:
628 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000629
showard170873e2009-01-07 00:22:26 +0000630
showard8cc058f2009-09-08 16:26:33 +0000631 def _recover_pending_entries(self):
632 for entry in self._get_unassigned_entries(
633 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000634 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000635 entry.on_pending()
636
637
showardb8900452009-10-12 20:31:01 +0000638 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000639 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000640 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
641 unrecovered_hqes = []
642 for queue_entry in queue_entries:
643 special_tasks = models.SpecialTask.objects.filter(
644 task__in=(models.SpecialTask.Task.CLEANUP,
645 models.SpecialTask.Task.VERIFY),
646 queue_entry__id=queue_entry.id,
647 is_complete=False)
648 if special_tasks.count() == 0:
649 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000650
showardb8900452009-10-12 20:31:01 +0000651 if unrecovered_hqes:
652 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700653 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000654 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000655 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000656
657
showard65db3932009-10-28 19:54:35 +0000658 def _schedule_special_tasks(self):
659 """
660 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700661
662 Special tasks include PreJobTasks like verify, reset and cleanup.
663 They are created through _schedule_new_jobs and associated with a hqe
664 This method translates SpecialTasks to the appropriate AgentTask and
665 adds them to the dispatchers agents list, so _handle_agents can execute
666 them.
showard65db3932009-10-28 19:54:35 +0000667 """
Prashanth B4ec98672014-05-15 10:44:54 -0700668 # When the host scheduler is responsible for acquisition we only want
669 # to run tasks with leased hosts. All hqe tasks will already have
670 # leased hosts, and we don't want to run frontend tasks till the host
671 # scheduler has vetted the assignment. Note that this doesn't include
672 # frontend tasks with hosts leased by other active hqes.
673 for task in self._job_query_manager.get_prioritized_special_tasks(
674 only_tasks_with_leased_hosts=not _inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000675 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000676 continue
showardd1195652009-12-08 22:21:02 +0000677 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000678
679
showard170873e2009-01-07 00:22:26 +0000680 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000681 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000682 # should never happen
showarded2afea2009-07-07 20:54:07 +0000683 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000684 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000685 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700686 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000687 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000688
689
jadmanski0afbb632008-06-06 21:10:57 +0000690 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000691 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700692 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000693 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000694 if self.host_has_agent(host):
695 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000696 continue
showard8cc058f2009-09-08 16:26:33 +0000697 if self._host_has_scheduled_special_task(host):
698 # host will have a special task scheduled on the next cycle
699 continue
showard170873e2009-01-07 00:22:26 +0000700 if print_message:
showardb18134f2009-03-20 20:52:18 +0000701 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000702 models.SpecialTask.objects.create(
703 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000704 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000705
706
jadmanski0afbb632008-06-06 21:10:57 +0000707 def _recover_hosts(self):
708 # recover "Repair Failed" hosts
709 message = 'Reverifying dead host %s'
710 self._reverify_hosts_where("status = 'Repair Failed'",
711 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000712
713
showard89f84db2009-03-12 20:39:13 +0000714 def _refresh_pending_queue_entries(self):
715 """
716 Lookup the pending HostQueueEntries and call our HostScheduler
717 refresh() method given that list. Return the list.
718
719 @returns A list of pending HostQueueEntries sorted in priority order.
720 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700721 queue_entries = self._job_query_manager.get_pending_queue_entries(
722 only_hostless=not _inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000723 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000724 return []
showard89f84db2009-03-12 20:39:13 +0000725 return queue_entries
726
727
showarda9545c02009-12-18 22:44:26 +0000728 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800729 """Schedule a hostless (suite) job.
730
731 @param queue_entry: The queue_entry representing the hostless job.
732 """
showarda9545c02009-12-18 22:44:26 +0000733 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700734
735 # Need to set execution_subdir before setting the status:
736 # After a restart of the scheduler, agents will be restored for HQEs in
737 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
738 # execution_subdir is needed. Therefore it must be set before entering
739 # one of these states.
740 # Otherwise, if the scheduler was interrupted between setting the status
741 # and the execution_subdir, upon it's restart restoring agents would
742 # fail.
743 # Is there a way to get a status in one of these states without going
744 # through this code? Following cases are possible:
745 # - If it's aborted before being started:
746 # active bit will be 0, so there's nothing to parse, it will just be
747 # set to completed by _find_aborting. Critical statuses are skipped.
748 # - If it's aborted or it fails after being started:
749 # It was started, so this code was executed.
750 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000751 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000752
753
beepscc9fc702013-12-02 12:45:38 -0800754 def _schedule_host_job(self, host, queue_entry):
755 """Schedules a job on the given host.
756
757 1. Assign the host to the hqe, if it isn't already assigned.
758 2. Create a SpecialAgentTask for the hqe.
759 3. Activate the hqe.
760
761 @param queue_entry: The job to schedule.
762 @param host: The host to schedule the job on.
763 """
764 if self.host_has_agent(host):
765 host_agent_task = list(self._host_agents.get(host.id))[0].task
766 subject = 'Host with agents assigned to an HQE'
767 message = ('HQE: %s assigned host %s, but the host has '
768 'agent: %s for queue_entry %s. The HQE '
beepsf7d35162014-02-13 16:42:13 -0800769 'will have to try and acquire a host next tick ' %
beepscc9fc702013-12-02 12:45:38 -0800770 (queue_entry, host.hostname, host_agent_task,
771 host_agent_task.queue_entry))
772 email_manager.manager.enqueue_notify_email(subject, message)
beepscc9fc702013-12-02 12:45:38 -0800773 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700774 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800775
776
showard89f84db2009-03-12 20:39:13 +0000777 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700778 """
779 Find any new HQEs and call schedule_pre_job_tasks for it.
780
781 This involves setting the status of the HQE and creating a row in the
782 db corresponding the the special task, through
783 scheduler_models._queue_special_task. The new db row is then added as
784 an agent to the dispatcher through _schedule_special_tasks and
785 scheduled for execution on the drone through _handle_agents.
786 """
showard89f84db2009-03-12 20:39:13 +0000787 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000788
beepscc9fc702013-12-02 12:45:38 -0800789 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700790 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700791 new_jobs_with_hosts = 0
792 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800793 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700794 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000795
beepscc9fc702013-12-02 12:45:38 -0800796 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000797 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000798 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700799 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000800 else:
beepscc9fc702013-12-02 12:45:38 -0800801 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700802 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700803
Gabe Black1e1c41b2015-02-04 23:55:15 -0800804 autotest_stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800805 if not host_jobs:
806 return
Prashanth Bf66d51b2014-05-06 12:42:25 -0700807 if not _inline_host_acquisition:
808 message = ('Found %s jobs that need hosts though '
809 '_inline_host_acquisition=%s. Will acquire hosts.' %
810 ([str(job) for job in host_jobs],
811 _inline_host_acquisition))
812 email_manager.manager.enqueue_notify_email(
813 'Processing unexpected host acquisition requests', message)
814 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
815 for host_assignment in jobs_with_hosts:
816 self._schedule_host_job(host_assignment.host, host_assignment.job)
817 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800818
Gabe Black1e1c41b2015-02-04 23:55:15 -0800819 autotest_stats.Gauge(key).send('new_jobs_with_hosts',
820 new_jobs_with_hosts)
821 autotest_stats.Gauge(key).send('new_jobs_without_hosts',
822 new_jobs_need_hosts -
823 new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000824
825
showard8cc058f2009-09-08 16:26:33 +0000826 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700827 """
828 Adds agents to the dispatcher.
829
830 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
831 QueueTask for example, will have a job with a control file, and
832 the agent will have methods that poll, abort and check if the queue
833 task is finished. The dispatcher runs the agent_task, as well as
834 other agents in it's _agents member, through _handle_agents, by
835 calling the Agents tick().
836
837 This method creates an agent for each HQE in one of (starting, running,
838 gathering, parsing, archiving) states, and adds it to the dispatcher so
839 it is handled by _handle_agents.
840 """
showardd1195652009-12-08 22:21:02 +0000841 for agent_task in self._get_queue_entry_agent_tasks():
842 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000843
844
845 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000846 for entry in scheduler_models.HostQueueEntry.fetch(
847 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000848 task = entry.job.schedule_delayed_callback_task(entry)
849 if task:
showardd1195652009-12-08 22:21:02 +0000850 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000851
852
jadmanski0afbb632008-06-06 21:10:57 +0000853 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700854 """
855 Looks through the afe_host_queue_entries for an aborted entry.
856
857 The aborted bit is set on an HQE in many ways, the most common
858 being when a user requests an abort through the frontend, which
859 results in an rpc from the afe to abort_host_queue_entries.
860 """
jamesrene7c65cb2010-06-08 20:38:10 +0000861 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000862 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700863 where='aborted=1 and complete=0'):
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800864
865 # If the job is running on a shard, let the shard handle aborting
866 # it and sync back the right status.
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800867 if entry.job.shard_id is not None and not server_utils.is_shard():
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800868 logging.info('Waiting for shard %s to abort hqe %s',
869 entry.job.shard_id, entry)
870 continue
871
showardf4a2e502009-07-28 20:06:39 +0000872 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800873
874 # The task would have started off with both is_complete and
875 # is_active = False. Aborted tasks are neither active nor complete.
876 # For all currently active tasks this will happen through the agent,
877 # but we need to manually update the special tasks that haven't
878 # started yet, because they don't have agents.
879 models.SpecialTask.objects.filter(is_active=False,
880 queue_entry_id=entry.id).update(is_complete=True)
881
showardd3dc1992009-04-22 21:01:40 +0000882 for agent in self.get_agents_for_entry(entry):
883 agent.abort()
884 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000885 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700886 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000887 for job in jobs_to_stop:
888 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000889
890
beeps8bb1f7d2013-08-05 01:30:09 -0700891 def _find_aborted_special_tasks(self):
892 """
893 Find SpecialTasks that have been marked for abortion.
894
895 Poll the database looking for SpecialTasks that are active
896 and have been marked for abortion, then abort them.
897 """
898
899 # The completed and active bits are very important when it comes
900 # to scheduler correctness. The active bit is set through the prolog
901 # of a special task, and reset through the cleanup method of the
902 # SpecialAgentTask. The cleanup is called both through the abort and
903 # epilog. The complete bit is set in several places, and in general
904 # a hanging job will have is_active=1 is_complete=0, while a special
905 # task which completed will have is_active=0 is_complete=1. To check
906 # aborts we directly check active because the complete bit is set in
907 # several places, including the epilog of agent tasks.
908 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
909 is_aborted=True)
910 for task in aborted_tasks:
911 # There are 2 ways to get the agent associated with a task,
912 # through the host and through the hqe. A special task
913 # always needs a host, but doesn't always need a hqe.
914 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700915 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000916
beeps8bb1f7d2013-08-05 01:30:09 -0700917 # The epilog preforms critical actions such as
918 # queueing the next SpecialTask, requeuing the
919 # hqe etc, however it doesn't actually kill the
920 # monitor process and set the 'done' bit. Epilogs
921 # assume that the job failed, and that the monitor
922 # process has already written an exit code. The
923 # done bit is a necessary condition for
924 # _handle_agents to schedule any more special
925 # tasks against the host, and it must be set
926 # in addition to is_active, is_complete and success.
927 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000928 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700929
930
showard324bf812009-01-20 23:23:38 +0000931 def _can_start_agent(self, agent, num_started_this_cycle,
932 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000933 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000934 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000935 return True
936 # don't allow any nonzero-process agents to run after we've reached a
937 # limit (this avoids starvation of many-process agents)
938 if have_reached_limit:
939 return False
940 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000941 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000942 agent.task.owner_username,
943 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000944 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000945 return False
946 # if a single agent exceeds the per-cycle throttling, still allow it to
947 # run when it's the first agent in the cycle
948 if num_started_this_cycle == 0:
949 return True
950 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000951 if (num_started_this_cycle + agent.task.num_processes >
952 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000953 return False
954 return True
955
956
jadmanski0afbb632008-06-06 21:10:57 +0000957 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700958 """
959 Handles agents of the dispatcher.
960
961 Appropriate Agents are added to the dispatcher through
962 _schedule_running_host_queue_entries. These agents each
963 have a task. This method runs the agents task through
964 agent.tick() leading to:
965 agent.start
966 prolog -> AgentTasks prolog
967 For each queue entry:
968 sets host status/status to Running
969 set started_on in afe_host_queue_entries
970 run -> AgentTasks run
971 Creates PidfileRunMonitor
972 Queues the autoserv command line for this AgentTask
973 via the drone manager. These commands are executed
974 through the drone managers execute actions.
975 poll -> AgentTasks/BaseAgentTask poll
976 checks the monitors exit_code.
977 Executes epilog if task is finished.
978 Executes AgentTasks _finish_task
979 finish_task is usually responsible for setting the status
980 of the HQE/host, and updating it's active and complete fileds.
981
982 agent.is_done
983 Removed the agent from the dispatchers _agents queue.
984 Is_done checks the finished bit on the agent, that is
985 set based on the Agents task. During the agents poll
986 we check to see if the monitor process has exited in
987 it's finish method, and set the success member of the
988 task based on this exit code.
989 """
jadmanski0afbb632008-06-06 21:10:57 +0000990 num_started_this_cycle = 0
Prashanth Balasubramanian628bfcf2014-10-02 12:44:13 -0700991 num_finished_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000992 have_reached_limit = False
993 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700994 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000995 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700996 self._log_extra_msg('Processing Agent with Host Ids: %s and '
997 'queue_entry ids:%s' % (agent.host_ids,
998 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000999 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001000 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001001 have_reached_limit):
1002 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001003 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001004 continue
showardd1195652009-12-08 22:21:02 +00001005 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001006 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001007 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001008 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001009 if agent.is_done():
Prashanth Balasubramanian628bfcf2014-10-02 12:44:13 -07001010 num_finished_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001011 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001012 self.remove_agent(agent)
Gabe Black1e1c41b2015-02-04 23:55:15 -08001013 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Prashanth Balasubramanian628bfcf2014-10-02 12:44:13 -07001014 'agents_started', num_started_this_cycle)
Gabe Black1e1c41b2015-02-04 23:55:15 -08001015 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Prashanth Balasubramanian628bfcf2014-10-02 12:44:13 -07001016 'agents_finished', num_finished_this_cycle)
Simran Basi3f6717d2012-09-13 15:21:22 -07001017 logging.info('%d running processes. %d added this cycle.',
1018 _drone_manager.total_running_processes(),
1019 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +00001020
1021
showard29f7cd22009-04-29 21:16:24 +00001022 def _process_recurring_runs(self):
1023 recurring_runs = models.RecurringRun.objects.filter(
1024 start_date__lte=datetime.datetime.now())
1025 for rrun in recurring_runs:
1026 # Create job from template
1027 job = rrun.job
1028 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001029 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001030
1031 host_objects = info['hosts']
1032 one_time_hosts = info['one_time_hosts']
1033 metahost_objects = info['meta_hosts']
1034 dependencies = info['dependencies']
1035 atomic_group = info['atomic_group']
1036
1037 for host in one_time_hosts or []:
1038 this_host = models.Host.create_one_time_host(host.hostname)
1039 host_objects.append(this_host)
1040
1041 try:
1042 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001043 options=options,
showard29f7cd22009-04-29 21:16:24 +00001044 host_objects=host_objects,
1045 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001046 atomic_group=atomic_group)
1047
1048 except Exception, ex:
1049 logging.exception(ex)
1050 #TODO send email
1051
1052 if rrun.loop_count == 1:
1053 rrun.delete()
1054 else:
1055 if rrun.loop_count != 0: # if not infinite loop
1056 # calculate new start_date
1057 difference = datetime.timedelta(seconds=rrun.loop_period)
1058 rrun.start_date = rrun.start_date + difference
1059 rrun.loop_count -= 1
1060 rrun.save()
1061
1062
Simran Basia858a232012-08-21 11:04:37 -07001063SiteDispatcher = utils.import_site_class(
1064 __file__, 'autotest_lib.scheduler.site_monitor_db',
1065 'SiteDispatcher', BaseDispatcher)
1066
1067class Dispatcher(SiteDispatcher):
1068 pass
1069
1070
mbligh36768f02008-02-22 18:28:33 +00001071class Agent(object):
showard77182562009-06-10 00:16:05 +00001072 """
Alex Miller47715eb2013-07-24 03:34:01 -07001073 An agent for use by the Dispatcher class to perform a task. An agent wraps
1074 around an AgentTask mainly to associate the AgentTask with the queue_entry
1075 and host ids.
showard77182562009-06-10 00:16:05 +00001076
1077 The following methods are required on all task objects:
1078 poll() - Called periodically to let the task check its status and
1079 update its internal state. If the task succeeded.
1080 is_done() - Returns True if the task is finished.
1081 abort() - Called when an abort has been requested. The task must
1082 set its aborted attribute to True if it actually aborted.
1083
1084 The following attributes are required on all task objects:
1085 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001086 success - bool, True if this task succeeded.
1087 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1088 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001089 """
1090
1091
showard418785b2009-11-23 20:19:59 +00001092 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001093 """
Alex Miller47715eb2013-07-24 03:34:01 -07001094 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001095 """
showard8cc058f2009-09-08 16:26:33 +00001096 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001097
showard77182562009-06-10 00:16:05 +00001098 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001099 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001100
showard8cc058f2009-09-08 16:26:33 +00001101 self.queue_entry_ids = task.queue_entry_ids
1102 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001103
showard8cc058f2009-09-08 16:26:33 +00001104 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001105 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001106
1107
jadmanski0afbb632008-06-06 21:10:57 +00001108 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001109 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001110 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001111 self.task.poll()
1112 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001113 self.finished = True
showardec113162008-05-08 00:52:49 +00001114
1115
jadmanski0afbb632008-06-06 21:10:57 +00001116 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001117 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001118
1119
showardd3dc1992009-04-22 21:01:40 +00001120 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001121 if self.task:
1122 self.task.abort()
1123 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001124 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001125 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001126
showardd3dc1992009-04-22 21:01:40 +00001127
beeps5e2bb4a2013-10-28 11:26:45 -07001128class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001129 """
1130 Common functionality for QueueTask and HostlessQueueTask
1131 """
1132 def __init__(self, queue_entries):
1133 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001134 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001135 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001136
1137
showard73ec0442009-02-07 02:05:20 +00001138 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001139 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001140
1141
jamesrenc44ae992010-02-19 00:12:54 +00001142 def _write_control_file(self, execution_path):
1143 control_path = _drone_manager.attach_file_to_execution(
1144 execution_path, self.job.control_file)
1145 return control_path
1146
1147
Aviv Keshet308e7362013-05-21 14:43:16 -07001148 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001149 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001150 execution_path = self.queue_entries[0].execution_path()
1151 control_path = self._write_control_file(execution_path)
1152 hostnames = ','.join(entry.host.hostname
1153 for entry in self.queue_entries
1154 if not entry.is_hostless())
1155
1156 execution_tag = self.queue_entries[0].execution_tag()
1157 params = _autoserv_command_line(
1158 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001159 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001160 _drone_manager.absolute_path(control_path)],
1161 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001162 if self.job.is_image_update_job():
1163 params += ['--image', self.job.update_image_path]
1164
jamesrenc44ae992010-02-19 00:12:54 +00001165 return params
showardd1195652009-12-08 22:21:02 +00001166
1167
1168 @property
1169 def num_processes(self):
1170 return len(self.queue_entries)
1171
1172
1173 @property
1174 def owner_username(self):
1175 return self.job.owner
1176
1177
1178 def _working_directory(self):
1179 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001180
1181
jadmanski0afbb632008-06-06 21:10:57 +00001182 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001183 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001184 keyval_dict = self.job.keyval_dict()
1185 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001186 group_name = self.queue_entries[0].get_group_name()
1187 if group_name:
1188 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001189 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001190 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001191 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001192 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001193
1194
showard35162b02009-03-03 02:17:30 +00001195 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001196 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001197 _drone_manager.write_lines_to_file(error_file_path,
1198 [_LOST_PROCESS_ERROR])
1199
1200
showardd3dc1992009-04-22 21:01:40 +00001201 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001202 if not self.monitor:
1203 return
1204
showardd9205182009-04-27 20:09:55 +00001205 self._write_job_finished()
1206
showard35162b02009-03-03 02:17:30 +00001207 if self.monitor.lost_process:
1208 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001209
jadmanskif7fa2cc2008-10-01 14:13:23 +00001210
showardcbd74612008-11-19 21:42:02 +00001211 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001212 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001213 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001214 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001215 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001216
1217
jadmanskif7fa2cc2008-10-01 14:13:23 +00001218 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001219 if not self.monitor or not self.monitor.has_process():
1220 return
1221
jadmanskif7fa2cc2008-10-01 14:13:23 +00001222 # build up sets of all the aborted_by and aborted_on values
1223 aborted_by, aborted_on = set(), set()
1224 for queue_entry in self.queue_entries:
1225 if queue_entry.aborted_by:
1226 aborted_by.add(queue_entry.aborted_by)
1227 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1228 aborted_on.add(t)
1229
1230 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001231 # TODO(showard): this conditional is now obsolete, we just need to leave
1232 # it in temporarily for backwards compatibility over upgrades. delete
1233 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001234 assert len(aborted_by) <= 1
1235 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001236 aborted_by_value = aborted_by.pop()
1237 aborted_on_value = max(aborted_on)
1238 else:
1239 aborted_by_value = 'autotest_system'
1240 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001241
showarda0382352009-02-11 23:36:43 +00001242 self._write_keyval_after_job("aborted_by", aborted_by_value)
1243 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001244
showardcbd74612008-11-19 21:42:02 +00001245 aborted_on_string = str(datetime.datetime.fromtimestamp(
1246 aborted_on_value))
1247 self._write_status_comment('Job aborted by %s on %s' %
1248 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001249
1250
jadmanski0afbb632008-06-06 21:10:57 +00001251 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001252 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001253 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001254 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001255
1256
jadmanski0afbb632008-06-06 21:10:57 +00001257 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001258 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001259 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001260
1261
1262class QueueTask(AbstractQueueTask):
1263 def __init__(self, queue_entries):
1264 super(QueueTask, self).__init__(queue_entries)
1265 self._set_ids(queue_entries=queue_entries)
1266
1267
1268 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001269 self._check_queue_entry_statuses(
1270 self.queue_entries,
1271 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1272 models.HostQueueEntry.Status.RUNNING),
1273 allowed_host_statuses=(models.Host.Status.PENDING,
1274 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001275
1276 super(QueueTask, self).prolog()
1277
1278 for queue_entry in self.queue_entries:
1279 self._write_host_keyvals(queue_entry.host)
1280 queue_entry.host.set_status(models.Host.Status.RUNNING)
1281 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001282
1283
1284 def _finish_task(self):
1285 super(QueueTask, self)._finish_task()
1286
1287 for queue_entry in self.queue_entries:
1288 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001289 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001290
1291
Alex Miller9f01d5d2013-08-08 02:26:01 -07001292 def _command_line(self):
1293 invocation = super(QueueTask, self)._command_line()
1294 return invocation + ['--verify_job_repo_url']
1295
1296
Dan Shi1a189052013-10-28 14:41:35 -07001297class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001298 def __init__(self, queue_entry):
1299 super(HostlessQueueTask, self).__init__([queue_entry])
1300 self.queue_entry_ids = [queue_entry.id]
1301
1302
1303 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001304 super(HostlessQueueTask, self).prolog()
1305
1306
mbligh4608b002010-01-05 18:22:35 +00001307 def _finish_task(self):
1308 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001309
1310 # When a job is added to database, its initial status is always
1311 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1312 # status, check if any of them can be started. If scheduler hits some
Alex Millerac189f32014-06-23 13:55:23 -07001313 # limit, e.g., max_hostless_jobs_per_drone,
1314 # max_processes_started_per_cycle, scheduler will leave these jobs in
1315 # Starting status. Otherwise, the jobs' status will be changed to
1316 # Running, and an autoserv process will be started in drone for each of
1317 # these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001318 # If the entry is still in status Starting, the process has not started
1319 # yet. Therefore, there is no need to parse and collect log. Without
1320 # this check, exception will be raised by scheduler as execution_subdir
1321 # for this queue entry does not have a value yet.
1322 hqe = self.queue_entries[0]
1323 if hqe.status != models.HostQueueEntry.Status.STARTING:
1324 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001325
1326
mbligh36768f02008-02-22 18:28:33 +00001327if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001328 main()