blob: 201d2aff1926726e1eab594fd0cdce4b55eeb84e [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
Dan Shif6c65bd2014-08-29 16:15:07 -07008import datetime
9import gc
10import logging
11import optparse
12import os
13import signal
14import sys
15import time
showard402934a2009-12-21 22:20:47 +000016
Alex Miller05d7b4c2013-03-04 07:49:38 -080017import common
showard21baa452008-10-21 00:08:39 +000018from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000019
20import django.db
21
Dan Shiec1d47d2015-02-13 11:38:13 -080022from autotest_lib.client.common_lib import control_data
Prashanth B0e960282014-05-13 19:38:28 -070023from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070024from autotest_lib.client.common_lib import utils
Gabe Black1e1c41b2015-02-04 23:55:15 -080025from autotest_lib.client.common_lib.cros.graphite import autotest_stats
Prashanth B0e960282014-05-13 19:38:28 -070026from autotest_lib.frontend.afe import models, rpc_utils
Fang Dengc330bee2014-10-21 18:10:55 -070027from autotest_lib.scheduler import agent_task, drone_manager
beeps5e2bb4a2013-10-28 11:26:45 -070028from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
29from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070030from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070031from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070032from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000033from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080034from autotest_lib.scheduler import status_server, scheduler_config
Prashanth Bf66d51b2014-05-06 12:42:25 -070035from autotest_lib.scheduler import scheduler_lib
Aviv Keshet308e7362013-05-21 14:43:16 -070036from autotest_lib.server import autoserv_utils
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080037from autotest_lib.server import utils as server_utils
Dan Shib9144a42014-12-01 16:09:32 -080038from autotest_lib.site_utils import server_manager_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080039
showard549afad2009-08-20 23:33:36 +000040BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
41PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000042
mbligh36768f02008-02-22 18:28:33 +000043RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000044AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
45
46if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000047 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000048AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
49AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
50
51if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000052 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000053
showard35162b02009-03-03 02:17:30 +000054# error message to leave in results dir when an autoserv process disappears
55# mysteriously
56_LOST_PROCESS_ERROR = """\
57Autoserv failed abnormally during execution for this job, probably due to a
58system error on the Autotest server. Full results may not be available. Sorry.
59"""
60
Prashanth B0e960282014-05-13 19:38:28 -070061_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070062_db = None
mbligh36768f02008-02-22 18:28:33 +000063_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070064
65# These 2 globals are replaced for testing
66_autoserv_directory = autoserv_utils.autoserv_directory
67_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000068_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000069_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070070_inline_host_acquisition = global_config.global_config.get_config_value(
71 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
72 default=True)
73
Dan Shiec1d47d2015-02-13 11:38:13 -080074_enable_ssp_container = global_config.global_config.get_config_value(
75 'AUTOSERV', 'enable_ssp_container', type=bool,
76 default=True)
mbligh36768f02008-02-22 18:28:33 +000077
mbligh83c1e9e2009-05-01 23:10:41 +000078def _site_init_monitor_db_dummy():
79 return {}
80
81
jamesren76fcf192010-04-21 20:39:50 +000082def _verify_default_drone_set_exists():
83 if (models.DroneSet.drone_sets_enabled() and
84 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070085 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080086 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000087
88
89def _sanity_check():
90 """Make sure the configs are consistent before starting the scheduler"""
91 _verify_default_drone_set_exists()
92
93
mbligh36768f02008-02-22 18:28:33 +000094def main():
showard27f33872009-04-07 18:20:53 +000095 try:
showard549afad2009-08-20 23:33:36 +000096 try:
97 main_without_exception_handling()
98 except SystemExit:
99 raise
100 except:
101 logging.exception('Exception escaping in monitor_db')
102 raise
103 finally:
104 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000105
106
107def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700108 scheduler_lib.setup_logging(
109 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
110 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000111 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000112 parser = optparse.OptionParser(usage)
113 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
114 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000115 parser.add_option('--test', help='Indicate that scheduler is under ' +
116 'test and should use dummy autoserv and no parsing',
117 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700118 parser.add_option('--production',
119 help=('Indicate that scheduler is running in production '
120 'environment and it can use database that is not '
121 'hosted in localhost. If it is set to False, '
122 'scheduler will fail if database is not in '
123 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700124 action='store_true', default=False)
jadmanski0afbb632008-06-06 21:10:57 +0000125 (options, args) = parser.parse_args()
126 if len(args) != 1:
127 parser.print_usage()
128 return
mbligh36768f02008-02-22 18:28:33 +0000129
Dan Shif6c65bd2014-08-29 16:15:07 -0700130 scheduler_lib.check_production_settings(options)
131
showard5613c662009-06-08 23:30:33 +0000132 scheduler_enabled = global_config.global_config.get_config_value(
133 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
134
135 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800136 logging.error("Scheduler not enabled, set enable_scheduler to true in "
137 "the global_config's SCHEDULER section to enable it. "
138 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000139 sys.exit(1)
140
jadmanski0afbb632008-06-06 21:10:57 +0000141 global RESULTS_DIR
142 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000143
mbligh83c1e9e2009-05-01 23:10:41 +0000144 site_init = utils.import_site_function(__file__,
145 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
146 _site_init_monitor_db_dummy)
147 site_init()
148
showardcca334f2009-03-12 20:38:34 +0000149 # Change the cwd while running to avoid issues incase we were launched from
150 # somewhere odd (such as a random NFS home directory of the person running
151 # sudo to launch us as the appropriate user).
152 os.chdir(RESULTS_DIR)
153
jamesrenc7d387e2010-08-10 21:48:30 +0000154 # This is helpful for debugging why stuff a scheduler launches is
155 # misbehaving.
156 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000157
jadmanski0afbb632008-06-06 21:10:57 +0000158 if options.test:
159 global _autoserv_path
160 _autoserv_path = 'autoserv_dummy'
161 global _testing_mode
162 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000163
jamesrenc44ae992010-02-19 00:12:54 +0000164 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000165 server.start()
166
jadmanski0afbb632008-06-06 21:10:57 +0000167 try:
jamesrenc44ae992010-02-19 00:12:54 +0000168 initialize()
showardc5afc462009-01-13 00:09:39 +0000169 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000170 dispatcher.initialize(recover_hosts=options.recover_hosts)
Shuqian Zhaoce59fe52015-03-18 10:53:58 -0700171 minimum_tick_sec = global_config.global_config.get_config_value(
172 scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
showardc5afc462009-01-13 00:09:39 +0000173
Eric Lia82dc352011-02-23 13:15:52 -0800174 while not _shutdown and not server._shutdown_scheduler:
Shuqian Zhaoce59fe52015-03-18 10:53:58 -0700175 start = time.time()
jadmanski0afbb632008-06-06 21:10:57 +0000176 dispatcher.tick()
Shuqian Zhaoce59fe52015-03-18 10:53:58 -0700177 curr_tick_sec = time.time() - start
178 if (minimum_tick_sec > curr_tick_sec):
179 time.sleep(minimum_tick_sec - curr_tick_sec)
180 else:
181 time.sleep(0.0001)
Prashanth B4ec98672014-05-15 10:44:54 -0700182 except Exception:
showard170873e2009-01-07 00:22:26 +0000183 email_manager.manager.log_stacktrace(
184 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000185
showard170873e2009-01-07 00:22:26 +0000186 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000187 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000188 _drone_manager.shutdown()
Prashanth B0e960282014-05-13 19:38:28 -0700189 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000190
191
Prashanth B4ec98672014-05-15 10:44:54 -0700192def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000193 global _shutdown
194 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000195 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000196
197
jamesrenc44ae992010-02-19 00:12:54 +0000198def initialize():
showardb18134f2009-03-20 20:52:18 +0000199 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
200 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000201
showard8de37132009-08-31 18:33:08 +0000202 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000203 logging.critical("monitor_db already running, aborting!")
204 sys.exit(1)
205 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000206
showardb1e51872008-10-07 11:08:18 +0000207 if _testing_mode:
208 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700209 scheduler_lib.DB_CONFIG_SECTION, 'database',
210 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000211
Dan Shib9144a42014-12-01 16:09:32 -0800212 # If server database is enabled, check if the server has role `scheduler`.
213 # If the server does not have scheduler role, exception will be raised and
214 # scheduler will not continue to run.
215 if server_manager_utils.use_server_db():
216 server_manager_utils.confirm_server_has_role(hostname='localhost',
217 role='scheduler')
218
jadmanski0afbb632008-06-06 21:10:57 +0000219 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700220 global _db_manager
221 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700222 global _db
223 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000224 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700225 signal.signal(signal.SIGINT, handle_signal)
226 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000227
jamesrenc44ae992010-02-19 00:12:54 +0000228 initialize_globals()
229 scheduler_models.initialize()
230
Dan Shib9144a42014-12-01 16:09:32 -0800231 if server_manager_utils.use_server_db():
232 drone_list = server_manager_utils.get_drones()
233 else:
234 drones = global_config.global_config.get_config_value(
235 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
236 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000237 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000238 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000239 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
240
showardb18134f2009-03-20 20:52:18 +0000241 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000242
243
jamesrenc44ae992010-02-19 00:12:54 +0000244def initialize_globals():
245 global _drone_manager
246 _drone_manager = drone_manager.instance()
247
248
showarded2afea2009-07-07 20:54:07 +0000249def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
250 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000251 """
252 @returns The autoserv command line as a list of executable + parameters.
253
254 @param machines - string - A machine or comma separated list of machines
255 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000256 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700257 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
258 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000259 @param queue_entry - A HostQueueEntry object - If supplied and no Job
260 object was supplied, this will be used to lookup the Job object.
261 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700262 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
263 machines, results_directory=drone_manager.WORKING_DIRECTORY,
264 extra_args=extra_args, job=job, queue_entry=queue_entry,
265 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000266
267
Simran Basia858a232012-08-21 11:04:37 -0700268class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800269
270
jadmanski0afbb632008-06-06 21:10:57 +0000271 def __init__(self):
272 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000273 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700274 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000275 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700276 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700277 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Jakob Jülich36accc62014-07-23 10:26:55 -0700278 _db)
showard170873e2009-01-07 00:22:26 +0000279 self._host_agents = {}
280 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000281 self._tick_count = 0
282 self._last_garbage_stats_time = time.time()
283 self._seconds_between_garbage_stats = 60 * (
284 global_config.global_config.get_config_value(
285 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700286 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700287 self._tick_debug = global_config.global_config.get_config_value(
288 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
289 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700290 self._extra_debugging = global_config.global_config.get_config_value(
291 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
292 default=False)
mbligh36768f02008-02-22 18:28:33 +0000293
Prashanth Bf66d51b2014-05-06 12:42:25 -0700294 # If _inline_host_acquisition is set the scheduler will acquire and
295 # release hosts against jobs inline, with the tick. Otherwise the
296 # scheduler will only focus on jobs that already have hosts, and
297 # will not explicitly unlease a host when a job finishes using it.
298 self._job_query_manager = query_managers.AFEJobQueryManager()
299 self._host_scheduler = (host_scheduler.BaseHostScheduler()
300 if _inline_host_acquisition else
301 host_scheduler.DummyHostScheduler())
302
mbligh36768f02008-02-22 18:28:33 +0000303
showard915958d2009-04-22 21:00:58 +0000304 def initialize(self, recover_hosts=True):
305 self._periodic_cleanup.initialize()
306 self._24hr_upkeep.initialize()
307
jadmanski0afbb632008-06-06 21:10:57 +0000308 # always recover processes
309 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000310
jadmanski0afbb632008-06-06 21:10:57 +0000311 if recover_hosts:
312 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000313
314
Simran Basi0ec94dd2012-08-28 09:50:10 -0700315 def _log_tick_msg(self, msg):
316 if self._tick_debug:
317 logging.debug(msg)
318
319
Simran Basidef92872012-09-20 13:34:34 -0700320 def _log_extra_msg(self, msg):
321 if self._extra_debugging:
322 logging.debug(msg)
323
324
jadmanski0afbb632008-06-06 21:10:57 +0000325 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700326 """
327 This is an altered version of tick() where we keep track of when each
328 major step begins so we can try to figure out where we are using most
329 of the tick time.
330 """
Gabe Black1e1c41b2015-02-04 23:55:15 -0800331 timer = autotest_stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700332 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000333 self._garbage_collection()
Prashanth B340fd1e2014-06-22 12:44:10 -0700334 self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
335 _drone_manager.trigger_refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700336 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000337 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700338 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000339 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700340 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000341 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700342 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000343 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700344 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000345 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700346 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000347 self._schedule_new_jobs()
Prashanth B340fd1e2014-06-22 12:44:10 -0700348 self._log_tick_msg('Calling _drone_manager.sync_refresh().')
349 _drone_manager.sync_refresh()
Prashanth B67548092014-07-11 18:46:01 -0700350 self._log_tick_msg('Calling _find_aborting().')
351 self._find_aborting()
352 self._log_tick_msg('Calling _find_aborted_special_tasks().')
353 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700354 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000355 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700356 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000357 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700358 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000359 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700360 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700361 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700362 with timer.get_client('email_manager_send_queued_emails'):
363 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700364 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700365 with timer.get_client('django_db_reset_queries'):
366 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000367 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000368
showard97aed502008-11-04 02:01:24 +0000369
mblighf3294cc2009-04-08 21:17:38 +0000370 def _run_cleanup(self):
371 self._periodic_cleanup.run_cleanup_maybe()
372 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000373
mbligh36768f02008-02-22 18:28:33 +0000374
showardf13a9e22009-12-18 22:54:09 +0000375 def _garbage_collection(self):
376 threshold_time = time.time() - self._seconds_between_garbage_stats
377 if threshold_time < self._last_garbage_stats_time:
378 # Don't generate these reports very often.
379 return
380
381 self._last_garbage_stats_time = time.time()
382 # Force a full level 0 collection (because we can, it doesn't hurt
383 # at this interval).
384 gc.collect()
385 logging.info('Logging garbage collector stats on tick %d.',
386 self._tick_count)
387 gc_stats._log_garbage_collector_stats()
388
389
showard170873e2009-01-07 00:22:26 +0000390 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
391 for object_id in object_ids:
392 agent_dict.setdefault(object_id, set()).add(agent)
393
394
395 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
396 for object_id in object_ids:
397 assert object_id in agent_dict
398 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700399 # If an ID has no more active agent associated, there is no need to
400 # keep it in the dictionary. Otherwise, scheduler will keep an
401 # unnecessarily big dictionary until being restarted.
402 if not agent_dict[object_id]:
403 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000404
405
showardd1195652009-12-08 22:21:02 +0000406 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700407 """
408 Creates and adds an agent to the dispatchers list.
409
410 In creating the agent we also pass on all the queue_entry_ids and
411 host_ids from the special agent task. For every agent we create, we
412 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
413 against the host_ids given to it. So theoritically, a host can have any
414 number of agents associated with it, and each of them can have any
415 special agent task, though in practice we never see > 1 agent/task per
416 host at any time.
417
418 @param agent_task: A SpecialTask for the agent to manage.
419 """
showardd1195652009-12-08 22:21:02 +0000420 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000421 self._agents.append(agent)
422 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000423 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
424 self._register_agent_for_ids(self._queue_entry_agents,
425 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000426
showard170873e2009-01-07 00:22:26 +0000427
428 def get_agents_for_entry(self, queue_entry):
429 """
430 Find agents corresponding to the specified queue_entry.
431 """
showardd3dc1992009-04-22 21:01:40 +0000432 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000433
434
435 def host_has_agent(self, host):
436 """
437 Determine if there is currently an Agent present using this host.
438 """
439 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000440
441
jadmanski0afbb632008-06-06 21:10:57 +0000442 def remove_agent(self, agent):
443 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000444 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
445 agent)
446 self._unregister_agent_for_ids(self._queue_entry_agents,
447 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000448
449
showard8cc058f2009-09-08 16:26:33 +0000450 def _host_has_scheduled_special_task(self, host):
451 return bool(models.SpecialTask.objects.filter(host__id=host.id,
452 is_active=False,
453 is_complete=False))
454
455
jadmanski0afbb632008-06-06 21:10:57 +0000456 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000457 agent_tasks = self._create_recovery_agent_tasks()
458 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000459 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000460 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000461 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000462 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000463 self._reverify_remaining_hosts()
464 # reinitialize drones after killing orphaned processes, since they can
465 # leave around files when they die
466 _drone_manager.execute_actions()
467 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000468
showard170873e2009-01-07 00:22:26 +0000469
showardd1195652009-12-08 22:21:02 +0000470 def _create_recovery_agent_tasks(self):
471 return (self._get_queue_entry_agent_tasks()
472 + self._get_special_task_agent_tasks(is_active=True))
473
474
475 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700476 """
477 Get agent tasks for all hqe in the specified states.
478
479 Loosely this translates to taking a hqe in one of the specified states,
480 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
481 through _get_agent_task_for_queue_entry. Each queue entry can only have
482 one agent task at a time, but there might be multiple queue entries in
483 the group.
484
485 @return: A list of AgentTasks.
486 """
showardd1195652009-12-08 22:21:02 +0000487 # host queue entry statuses handled directly by AgentTasks (Verifying is
488 # handled through SpecialTasks, so is not listed here)
489 statuses = (models.HostQueueEntry.Status.STARTING,
490 models.HostQueueEntry.Status.RUNNING,
491 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000492 models.HostQueueEntry.Status.PARSING,
493 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000494 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000495 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000496 where='status IN (%s)' % status_list)
Gabe Black1e1c41b2015-02-04 23:55:15 -0800497 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Alex Miller47cd2472013-11-25 15:20:04 -0800498 'running', len(queue_entries))
showardd1195652009-12-08 22:21:02 +0000499
500 agent_tasks = []
501 used_queue_entries = set()
502 for entry in queue_entries:
503 if self.get_agents_for_entry(entry):
504 # already being handled
505 continue
506 if entry in used_queue_entries:
507 # already picked up by a synchronous job
508 continue
509 agent_task = self._get_agent_task_for_queue_entry(entry)
510 agent_tasks.append(agent_task)
511 used_queue_entries.update(agent_task.queue_entries)
512 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000513
514
showardd1195652009-12-08 22:21:02 +0000515 def _get_special_task_agent_tasks(self, is_active=False):
516 special_tasks = models.SpecialTask.objects.filter(
517 is_active=is_active, is_complete=False)
518 return [self._get_agent_task_for_special_task(task)
519 for task in special_tasks]
520
521
522 def _get_agent_task_for_queue_entry(self, queue_entry):
523 """
beeps8bb1f7d2013-08-05 01:30:09 -0700524 Construct an AgentTask instance for the given active HostQueueEntry.
525
showardd1195652009-12-08 22:21:02 +0000526 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700527 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000528 """
529 task_entries = queue_entry.job.get_group_entries(queue_entry)
530 self._check_for_duplicate_host_entries(task_entries)
531
532 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
533 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000534 if queue_entry.is_hostless():
535 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000536 return QueueTask(queue_entries=task_entries)
537 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700538 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000539 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700540 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000541 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700542 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000543
Prashanth B0e960282014-05-13 19:38:28 -0700544 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800545 '_get_agent_task_for_queue_entry got entry with '
546 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000547
548
549 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000550 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
551 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000552 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000553 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000554 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000555 if using_host:
showardd1195652009-12-08 22:21:02 +0000556 self._assert_host_has_no_agent(task_entry)
557
558
559 def _assert_host_has_no_agent(self, entry):
560 """
561 @param entry: a HostQueueEntry or a SpecialTask
562 """
563 if self.host_has_agent(entry.host):
564 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700565 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000566 'While scheduling %s, host %s already has a host agent %s'
567 % (entry, entry.host, agent.task))
568
569
570 def _get_agent_task_for_special_task(self, special_task):
571 """
572 Construct an AgentTask class to run the given SpecialTask and add it
573 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700574
MK Ryu35d661e2014-09-25 17:44:10 -0700575 A special task is created through schedule_special_tasks, but only if
beeps8bb1f7d2013-08-05 01:30:09 -0700576 the host doesn't already have an agent. This happens through
577 add_agent_task. All special agent tasks are given a host on creation,
578 and a Null hqe. To create a SpecialAgentTask object, you need a
579 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
580 object contains a hqe it's passed on to the special agent task, which
581 creates a HostQueueEntry and saves it as it's queue_entry.
582
showardd1195652009-12-08 22:21:02 +0000583 @param special_task: a models.SpecialTask instance
584 @returns an AgentTask to run this SpecialTask
585 """
586 self._assert_host_has_no_agent(special_task)
587
beeps5e2bb4a2013-10-28 11:26:45 -0700588 special_agent_task_classes = (prejob_task.CleanupTask,
589 prejob_task.VerifyTask,
590 prejob_task.RepairTask,
591 prejob_task.ResetTask,
592 prejob_task.ProvisionTask)
593
showardd1195652009-12-08 22:21:02 +0000594 for agent_task_class in special_agent_task_classes:
595 if agent_task_class.TASK_TYPE == special_task.task:
596 return agent_task_class(task=special_task)
597
Prashanth B0e960282014-05-13 19:38:28 -0700598 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800599 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000600
601
602 def _register_pidfiles(self, agent_tasks):
603 for agent_task in agent_tasks:
604 agent_task.register_necessary_pidfiles()
605
606
607 def _recover_tasks(self, agent_tasks):
608 orphans = _drone_manager.get_orphaned_autoserv_processes()
609
610 for agent_task in agent_tasks:
611 agent_task.recover()
612 if agent_task.monitor and agent_task.monitor.has_process():
613 orphans.discard(agent_task.monitor.get_process())
614 self.add_agent_task(agent_task)
615
616 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000617
618
showard8cc058f2009-09-08 16:26:33 +0000619 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000620 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
621 % status):
showard0db3d432009-10-12 20:29:15 +0000622 if entry.status == status and not self.get_agents_for_entry(entry):
623 # The status can change during iteration, e.g., if job.run()
624 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000625 yield entry
626
627
showard6878e8b2009-07-20 22:37:45 +0000628 def _check_for_remaining_orphan_processes(self, orphans):
629 if not orphans:
630 return
631 subject = 'Unrecovered orphan autoserv processes remain'
632 message = '\n'.join(str(process) for process in orphans)
633 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000634
635 die_on_orphans = global_config.global_config.get_config_value(
636 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
637
638 if die_on_orphans:
639 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000640
showard170873e2009-01-07 00:22:26 +0000641
showard8cc058f2009-09-08 16:26:33 +0000642 def _recover_pending_entries(self):
643 for entry in self._get_unassigned_entries(
644 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000645 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000646 entry.on_pending()
647
648
showardb8900452009-10-12 20:31:01 +0000649 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000650 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000651 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
652 unrecovered_hqes = []
653 for queue_entry in queue_entries:
654 special_tasks = models.SpecialTask.objects.filter(
655 task__in=(models.SpecialTask.Task.CLEANUP,
656 models.SpecialTask.Task.VERIFY),
657 queue_entry__id=queue_entry.id,
658 is_complete=False)
659 if special_tasks.count() == 0:
660 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000661
showardb8900452009-10-12 20:31:01 +0000662 if unrecovered_hqes:
663 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700664 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000665 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000666 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000667
668
showard65db3932009-10-28 19:54:35 +0000669 def _schedule_special_tasks(self):
670 """
671 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700672
673 Special tasks include PreJobTasks like verify, reset and cleanup.
674 They are created through _schedule_new_jobs and associated with a hqe
675 This method translates SpecialTasks to the appropriate AgentTask and
676 adds them to the dispatchers agents list, so _handle_agents can execute
677 them.
showard65db3932009-10-28 19:54:35 +0000678 """
Prashanth B4ec98672014-05-15 10:44:54 -0700679 # When the host scheduler is responsible for acquisition we only want
680 # to run tasks with leased hosts. All hqe tasks will already have
681 # leased hosts, and we don't want to run frontend tasks till the host
682 # scheduler has vetted the assignment. Note that this doesn't include
683 # frontend tasks with hosts leased by other active hqes.
684 for task in self._job_query_manager.get_prioritized_special_tasks(
685 only_tasks_with_leased_hosts=not _inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000686 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000687 continue
showardd1195652009-12-08 22:21:02 +0000688 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000689
690
showard170873e2009-01-07 00:22:26 +0000691 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000692 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000693 # should never happen
showarded2afea2009-07-07 20:54:07 +0000694 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000695 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000696 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700697 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000698 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000699
700
jadmanski0afbb632008-06-06 21:10:57 +0000701 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000702 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700703 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000704 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000705 if self.host_has_agent(host):
706 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000707 continue
showard8cc058f2009-09-08 16:26:33 +0000708 if self._host_has_scheduled_special_task(host):
709 # host will have a special task scheduled on the next cycle
710 continue
showard170873e2009-01-07 00:22:26 +0000711 if print_message:
showardb18134f2009-03-20 20:52:18 +0000712 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000713 models.SpecialTask.objects.create(
714 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000715 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000716
717
jadmanski0afbb632008-06-06 21:10:57 +0000718 def _recover_hosts(self):
719 # recover "Repair Failed" hosts
720 message = 'Reverifying dead host %s'
721 self._reverify_hosts_where("status = 'Repair Failed'",
722 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000723
724
showard89f84db2009-03-12 20:39:13 +0000725 def _refresh_pending_queue_entries(self):
726 """
727 Lookup the pending HostQueueEntries and call our HostScheduler
728 refresh() method given that list. Return the list.
729
730 @returns A list of pending HostQueueEntries sorted in priority order.
731 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700732 queue_entries = self._job_query_manager.get_pending_queue_entries(
733 only_hostless=not _inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000734 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000735 return []
showard89f84db2009-03-12 20:39:13 +0000736 return queue_entries
737
738
showarda9545c02009-12-18 22:44:26 +0000739 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800740 """Schedule a hostless (suite) job.
741
742 @param queue_entry: The queue_entry representing the hostless job.
743 """
showarda9545c02009-12-18 22:44:26 +0000744 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700745
746 # Need to set execution_subdir before setting the status:
747 # After a restart of the scheduler, agents will be restored for HQEs in
748 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
749 # execution_subdir is needed. Therefore it must be set before entering
750 # one of these states.
751 # Otherwise, if the scheduler was interrupted between setting the status
752 # and the execution_subdir, upon it's restart restoring agents would
753 # fail.
754 # Is there a way to get a status in one of these states without going
755 # through this code? Following cases are possible:
756 # - If it's aborted before being started:
757 # active bit will be 0, so there's nothing to parse, it will just be
758 # set to completed by _find_aborting. Critical statuses are skipped.
759 # - If it's aborted or it fails after being started:
760 # It was started, so this code was executed.
761 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000762 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000763
764
beepscc9fc702013-12-02 12:45:38 -0800765 def _schedule_host_job(self, host, queue_entry):
766 """Schedules a job on the given host.
767
768 1. Assign the host to the hqe, if it isn't already assigned.
769 2. Create a SpecialAgentTask for the hqe.
770 3. Activate the hqe.
771
772 @param queue_entry: The job to schedule.
773 @param host: The host to schedule the job on.
774 """
775 if self.host_has_agent(host):
776 host_agent_task = list(self._host_agents.get(host.id))[0].task
777 subject = 'Host with agents assigned to an HQE'
778 message = ('HQE: %s assigned host %s, but the host has '
779 'agent: %s for queue_entry %s. The HQE '
beepsf7d35162014-02-13 16:42:13 -0800780 'will have to try and acquire a host next tick ' %
beepscc9fc702013-12-02 12:45:38 -0800781 (queue_entry, host.hostname, host_agent_task,
782 host_agent_task.queue_entry))
783 email_manager.manager.enqueue_notify_email(subject, message)
beepscc9fc702013-12-02 12:45:38 -0800784 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700785 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800786
787
showard89f84db2009-03-12 20:39:13 +0000788 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700789 """
790 Find any new HQEs and call schedule_pre_job_tasks for it.
791
792 This involves setting the status of the HQE and creating a row in the
793 db corresponding the the special task, through
794 scheduler_models._queue_special_task. The new db row is then added as
795 an agent to the dispatcher through _schedule_special_tasks and
796 scheduled for execution on the drone through _handle_agents.
797 """
showard89f84db2009-03-12 20:39:13 +0000798 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000799
beepscc9fc702013-12-02 12:45:38 -0800800 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700801 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700802 new_jobs_with_hosts = 0
803 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800804 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700805 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000806
beepscc9fc702013-12-02 12:45:38 -0800807 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000808 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000809 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700810 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000811 else:
beepscc9fc702013-12-02 12:45:38 -0800812 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700813 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700814
Gabe Black1e1c41b2015-02-04 23:55:15 -0800815 autotest_stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800816 if not host_jobs:
817 return
Prashanth Bf66d51b2014-05-06 12:42:25 -0700818 if not _inline_host_acquisition:
819 message = ('Found %s jobs that need hosts though '
820 '_inline_host_acquisition=%s. Will acquire hosts.' %
821 ([str(job) for job in host_jobs],
822 _inline_host_acquisition))
823 email_manager.manager.enqueue_notify_email(
824 'Processing unexpected host acquisition requests', message)
825 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
826 for host_assignment in jobs_with_hosts:
827 self._schedule_host_job(host_assignment.host, host_assignment.job)
828 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800829
Gabe Black1e1c41b2015-02-04 23:55:15 -0800830 autotest_stats.Gauge(key).send('new_jobs_with_hosts',
831 new_jobs_with_hosts)
832 autotest_stats.Gauge(key).send('new_jobs_without_hosts',
833 new_jobs_need_hosts -
834 new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000835
836
showard8cc058f2009-09-08 16:26:33 +0000837 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700838 """
839 Adds agents to the dispatcher.
840
841 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
842 QueueTask for example, will have a job with a control file, and
843 the agent will have methods that poll, abort and check if the queue
844 task is finished. The dispatcher runs the agent_task, as well as
845 other agents in it's _agents member, through _handle_agents, by
846 calling the Agents tick().
847
848 This method creates an agent for each HQE in one of (starting, running,
849 gathering, parsing, archiving) states, and adds it to the dispatcher so
850 it is handled by _handle_agents.
851 """
showardd1195652009-12-08 22:21:02 +0000852 for agent_task in self._get_queue_entry_agent_tasks():
853 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000854
855
856 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000857 for entry in scheduler_models.HostQueueEntry.fetch(
858 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000859 task = entry.job.schedule_delayed_callback_task(entry)
860 if task:
showardd1195652009-12-08 22:21:02 +0000861 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000862
863
jadmanski0afbb632008-06-06 21:10:57 +0000864 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700865 """
866 Looks through the afe_host_queue_entries for an aborted entry.
867
868 The aborted bit is set on an HQE in many ways, the most common
869 being when a user requests an abort through the frontend, which
870 results in an rpc from the afe to abort_host_queue_entries.
871 """
jamesrene7c65cb2010-06-08 20:38:10 +0000872 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000873 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700874 where='aborted=1 and complete=0'):
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800875
876 # If the job is running on a shard, let the shard handle aborting
877 # it and sync back the right status.
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800878 if entry.job.shard_id is not None and not server_utils.is_shard():
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800879 logging.info('Waiting for shard %s to abort hqe %s',
880 entry.job.shard_id, entry)
881 continue
882
showardf4a2e502009-07-28 20:06:39 +0000883 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800884
885 # The task would have started off with both is_complete and
886 # is_active = False. Aborted tasks are neither active nor complete.
887 # For all currently active tasks this will happen through the agent,
888 # but we need to manually update the special tasks that haven't
889 # started yet, because they don't have agents.
890 models.SpecialTask.objects.filter(is_active=False,
891 queue_entry_id=entry.id).update(is_complete=True)
892
showardd3dc1992009-04-22 21:01:40 +0000893 for agent in self.get_agents_for_entry(entry):
894 agent.abort()
895 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000896 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700897 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000898 for job in jobs_to_stop:
899 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000900
901
beeps8bb1f7d2013-08-05 01:30:09 -0700902 def _find_aborted_special_tasks(self):
903 """
904 Find SpecialTasks that have been marked for abortion.
905
906 Poll the database looking for SpecialTasks that are active
907 and have been marked for abortion, then abort them.
908 """
909
910 # The completed and active bits are very important when it comes
911 # to scheduler correctness. The active bit is set through the prolog
912 # of a special task, and reset through the cleanup method of the
913 # SpecialAgentTask. The cleanup is called both through the abort and
914 # epilog. The complete bit is set in several places, and in general
915 # a hanging job will have is_active=1 is_complete=0, while a special
916 # task which completed will have is_active=0 is_complete=1. To check
917 # aborts we directly check active because the complete bit is set in
918 # several places, including the epilog of agent tasks.
919 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
920 is_aborted=True)
921 for task in aborted_tasks:
922 # There are 2 ways to get the agent associated with a task,
923 # through the host and through the hqe. A special task
924 # always needs a host, but doesn't always need a hqe.
925 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700926 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000927
beeps8bb1f7d2013-08-05 01:30:09 -0700928 # The epilog preforms critical actions such as
929 # queueing the next SpecialTask, requeuing the
930 # hqe etc, however it doesn't actually kill the
931 # monitor process and set the 'done' bit. Epilogs
932 # assume that the job failed, and that the monitor
933 # process has already written an exit code. The
934 # done bit is a necessary condition for
935 # _handle_agents to schedule any more special
936 # tasks against the host, and it must be set
937 # in addition to is_active, is_complete and success.
938 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000939 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700940
941
showard324bf812009-01-20 23:23:38 +0000942 def _can_start_agent(self, agent, num_started_this_cycle,
943 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000944 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000945 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000946 return True
947 # don't allow any nonzero-process agents to run after we've reached a
948 # limit (this avoids starvation of many-process agents)
949 if have_reached_limit:
950 return False
951 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000952 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000953 agent.task.owner_username,
954 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000955 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000956 return False
957 # if a single agent exceeds the per-cycle throttling, still allow it to
958 # run when it's the first agent in the cycle
959 if num_started_this_cycle == 0:
960 return True
961 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000962 if (num_started_this_cycle + agent.task.num_processes >
963 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000964 return False
965 return True
966
967
jadmanski0afbb632008-06-06 21:10:57 +0000968 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700969 """
970 Handles agents of the dispatcher.
971
972 Appropriate Agents are added to the dispatcher through
973 _schedule_running_host_queue_entries. These agents each
974 have a task. This method runs the agents task through
975 agent.tick() leading to:
976 agent.start
977 prolog -> AgentTasks prolog
978 For each queue entry:
979 sets host status/status to Running
980 set started_on in afe_host_queue_entries
981 run -> AgentTasks run
982 Creates PidfileRunMonitor
983 Queues the autoserv command line for this AgentTask
984 via the drone manager. These commands are executed
985 through the drone managers execute actions.
986 poll -> AgentTasks/BaseAgentTask poll
987 checks the monitors exit_code.
988 Executes epilog if task is finished.
989 Executes AgentTasks _finish_task
990 finish_task is usually responsible for setting the status
991 of the HQE/host, and updating it's active and complete fileds.
992
993 agent.is_done
994 Removed the agent from the dispatchers _agents queue.
995 Is_done checks the finished bit on the agent, that is
996 set based on the Agents task. During the agents poll
997 we check to see if the monitor process has exited in
998 it's finish method, and set the success member of the
999 task based on this exit code.
1000 """
jadmanski0afbb632008-06-06 21:10:57 +00001001 num_started_this_cycle = 0
Prashanth Balasubramanian628bfcf2014-10-02 12:44:13 -07001002 num_finished_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001003 have_reached_limit = False
1004 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -07001005 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +00001006 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001007 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1008 'queue_entry ids:%s' % (agent.host_ids,
1009 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001010 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001011 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001012 have_reached_limit):
1013 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001014 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001015 continue
showardd1195652009-12-08 22:21:02 +00001016 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001017 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001018 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001019 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001020 if agent.is_done():
Prashanth Balasubramanian628bfcf2014-10-02 12:44:13 -07001021 num_finished_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001022 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001023 self.remove_agent(agent)
Gabe Black1e1c41b2015-02-04 23:55:15 -08001024 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Prashanth Balasubramanian628bfcf2014-10-02 12:44:13 -07001025 'agents_started', num_started_this_cycle)
Gabe Black1e1c41b2015-02-04 23:55:15 -08001026 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Prashanth Balasubramanian628bfcf2014-10-02 12:44:13 -07001027 'agents_finished', num_finished_this_cycle)
Simran Basi3f6717d2012-09-13 15:21:22 -07001028 logging.info('%d running processes. %d added this cycle.',
1029 _drone_manager.total_running_processes(),
1030 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +00001031
1032
showard29f7cd22009-04-29 21:16:24 +00001033 def _process_recurring_runs(self):
1034 recurring_runs = models.RecurringRun.objects.filter(
1035 start_date__lte=datetime.datetime.now())
1036 for rrun in recurring_runs:
1037 # Create job from template
1038 job = rrun.job
1039 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001040 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001041
1042 host_objects = info['hosts']
1043 one_time_hosts = info['one_time_hosts']
1044 metahost_objects = info['meta_hosts']
1045 dependencies = info['dependencies']
1046 atomic_group = info['atomic_group']
1047
1048 for host in one_time_hosts or []:
1049 this_host = models.Host.create_one_time_host(host.hostname)
1050 host_objects.append(this_host)
1051
1052 try:
1053 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001054 options=options,
showard29f7cd22009-04-29 21:16:24 +00001055 host_objects=host_objects,
1056 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001057 atomic_group=atomic_group)
1058
1059 except Exception, ex:
1060 logging.exception(ex)
1061 #TODO send email
1062
1063 if rrun.loop_count == 1:
1064 rrun.delete()
1065 else:
1066 if rrun.loop_count != 0: # if not infinite loop
1067 # calculate new start_date
1068 difference = datetime.timedelta(seconds=rrun.loop_period)
1069 rrun.start_date = rrun.start_date + difference
1070 rrun.loop_count -= 1
1071 rrun.save()
1072
1073
Simran Basia858a232012-08-21 11:04:37 -07001074SiteDispatcher = utils.import_site_class(
1075 __file__, 'autotest_lib.scheduler.site_monitor_db',
1076 'SiteDispatcher', BaseDispatcher)
1077
1078class Dispatcher(SiteDispatcher):
1079 pass
1080
1081
mbligh36768f02008-02-22 18:28:33 +00001082class Agent(object):
showard77182562009-06-10 00:16:05 +00001083 """
Alex Miller47715eb2013-07-24 03:34:01 -07001084 An agent for use by the Dispatcher class to perform a task. An agent wraps
1085 around an AgentTask mainly to associate the AgentTask with the queue_entry
1086 and host ids.
showard77182562009-06-10 00:16:05 +00001087
1088 The following methods are required on all task objects:
1089 poll() - Called periodically to let the task check its status and
1090 update its internal state. If the task succeeded.
1091 is_done() - Returns True if the task is finished.
1092 abort() - Called when an abort has been requested. The task must
1093 set its aborted attribute to True if it actually aborted.
1094
1095 The following attributes are required on all task objects:
1096 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001097 success - bool, True if this task succeeded.
1098 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1099 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001100 """
1101
1102
showard418785b2009-11-23 20:19:59 +00001103 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001104 """
Alex Miller47715eb2013-07-24 03:34:01 -07001105 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001106 """
showard8cc058f2009-09-08 16:26:33 +00001107 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001108
showard77182562009-06-10 00:16:05 +00001109 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001110 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001111
showard8cc058f2009-09-08 16:26:33 +00001112 self.queue_entry_ids = task.queue_entry_ids
1113 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001114
showard8cc058f2009-09-08 16:26:33 +00001115 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001116 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001117
1118
jadmanski0afbb632008-06-06 21:10:57 +00001119 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001120 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001121 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001122 self.task.poll()
1123 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001124 self.finished = True
showardec113162008-05-08 00:52:49 +00001125
1126
jadmanski0afbb632008-06-06 21:10:57 +00001127 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001128 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001129
1130
showardd3dc1992009-04-22 21:01:40 +00001131 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001132 if self.task:
1133 self.task.abort()
1134 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001135 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001136 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001137
showardd3dc1992009-04-22 21:01:40 +00001138
beeps5e2bb4a2013-10-28 11:26:45 -07001139class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001140 """
1141 Common functionality for QueueTask and HostlessQueueTask
1142 """
1143 def __init__(self, queue_entries):
1144 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001145 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001146 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001147
1148
showard73ec0442009-02-07 02:05:20 +00001149 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001150 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001151
1152
jamesrenc44ae992010-02-19 00:12:54 +00001153 def _write_control_file(self, execution_path):
1154 control_path = _drone_manager.attach_file_to_execution(
1155 execution_path, self.job.control_file)
1156 return control_path
1157
1158
Aviv Keshet308e7362013-05-21 14:43:16 -07001159 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001160 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001161 execution_path = self.queue_entries[0].execution_path()
1162 control_path = self._write_control_file(execution_path)
1163 hostnames = ','.join(entry.host.hostname
1164 for entry in self.queue_entries
1165 if not entry.is_hostless())
1166
1167 execution_tag = self.queue_entries[0].execution_tag()
1168 params = _autoserv_command_line(
1169 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001170 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001171 _drone_manager.absolute_path(control_path)],
1172 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001173 if self.job.is_image_update_job():
1174 params += ['--image', self.job.update_image_path]
1175
jamesrenc44ae992010-02-19 00:12:54 +00001176 return params
showardd1195652009-12-08 22:21:02 +00001177
1178
1179 @property
1180 def num_processes(self):
1181 return len(self.queue_entries)
1182
1183
1184 @property
1185 def owner_username(self):
1186 return self.job.owner
1187
1188
1189 def _working_directory(self):
1190 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001191
1192
jadmanski0afbb632008-06-06 21:10:57 +00001193 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001194 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001195 keyval_dict = self.job.keyval_dict()
1196 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001197 group_name = self.queue_entries[0].get_group_name()
1198 if group_name:
1199 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001200 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001201 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001202 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001203 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001204
1205
showard35162b02009-03-03 02:17:30 +00001206 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001207 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001208 _drone_manager.write_lines_to_file(error_file_path,
1209 [_LOST_PROCESS_ERROR])
1210
1211
showardd3dc1992009-04-22 21:01:40 +00001212 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001213 if not self.monitor:
1214 return
1215
showardd9205182009-04-27 20:09:55 +00001216 self._write_job_finished()
1217
showard35162b02009-03-03 02:17:30 +00001218 if self.monitor.lost_process:
1219 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001220
jadmanskif7fa2cc2008-10-01 14:13:23 +00001221
showardcbd74612008-11-19 21:42:02 +00001222 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001223 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001224 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001225 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001226 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001227
1228
jadmanskif7fa2cc2008-10-01 14:13:23 +00001229 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001230 if not self.monitor or not self.monitor.has_process():
1231 return
1232
jadmanskif7fa2cc2008-10-01 14:13:23 +00001233 # build up sets of all the aborted_by and aborted_on values
1234 aborted_by, aborted_on = set(), set()
1235 for queue_entry in self.queue_entries:
1236 if queue_entry.aborted_by:
1237 aborted_by.add(queue_entry.aborted_by)
1238 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1239 aborted_on.add(t)
1240
1241 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001242 # TODO(showard): this conditional is now obsolete, we just need to leave
1243 # it in temporarily for backwards compatibility over upgrades. delete
1244 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001245 assert len(aborted_by) <= 1
1246 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001247 aborted_by_value = aborted_by.pop()
1248 aborted_on_value = max(aborted_on)
1249 else:
1250 aborted_by_value = 'autotest_system'
1251 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001252
showarda0382352009-02-11 23:36:43 +00001253 self._write_keyval_after_job("aborted_by", aborted_by_value)
1254 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001255
showardcbd74612008-11-19 21:42:02 +00001256 aborted_on_string = str(datetime.datetime.fromtimestamp(
1257 aborted_on_value))
1258 self._write_status_comment('Job aborted by %s on %s' %
1259 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001260
1261
jadmanski0afbb632008-06-06 21:10:57 +00001262 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001263 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001264 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001265 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001266
1267
jadmanski0afbb632008-06-06 21:10:57 +00001268 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001269 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001270 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001271
1272
1273class QueueTask(AbstractQueueTask):
1274 def __init__(self, queue_entries):
1275 super(QueueTask, self).__init__(queue_entries)
1276 self._set_ids(queue_entries=queue_entries)
1277
1278
1279 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001280 self._check_queue_entry_statuses(
1281 self.queue_entries,
1282 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1283 models.HostQueueEntry.Status.RUNNING),
1284 allowed_host_statuses=(models.Host.Status.PENDING,
1285 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001286
1287 super(QueueTask, self).prolog()
1288
1289 for queue_entry in self.queue_entries:
1290 self._write_host_keyvals(queue_entry.host)
1291 queue_entry.host.set_status(models.Host.Status.RUNNING)
1292 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001293
1294
1295 def _finish_task(self):
1296 super(QueueTask, self)._finish_task()
1297
1298 for queue_entry in self.queue_entries:
1299 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001300 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001301
1302
Alex Miller9f01d5d2013-08-08 02:26:01 -07001303 def _command_line(self):
1304 invocation = super(QueueTask, self)._command_line()
Dan Shiec1d47d2015-02-13 11:38:13 -08001305 # Check if server-side packaging is needed.
1306 if (_enable_ssp_container and
1307 self.job.control_type == control_data.CONTROL_TYPE.SERVER and
1308 self.job.require_ssp != False):
1309 invocation += ['--require-ssp']
Alex Miller9f01d5d2013-08-08 02:26:01 -07001310 return invocation + ['--verify_job_repo_url']
1311
1312
Dan Shi1a189052013-10-28 14:41:35 -07001313class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001314 def __init__(self, queue_entry):
1315 super(HostlessQueueTask, self).__init__([queue_entry])
1316 self.queue_entry_ids = [queue_entry.id]
1317
1318
1319 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001320 super(HostlessQueueTask, self).prolog()
1321
1322
mbligh4608b002010-01-05 18:22:35 +00001323 def _finish_task(self):
1324 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001325
1326 # When a job is added to database, its initial status is always
1327 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1328 # status, check if any of them can be started. If scheduler hits some
Alex Millerac189f32014-06-23 13:55:23 -07001329 # limit, e.g., max_hostless_jobs_per_drone,
1330 # max_processes_started_per_cycle, scheduler will leave these jobs in
1331 # Starting status. Otherwise, the jobs' status will be changed to
1332 # Running, and an autoserv process will be started in drone for each of
1333 # these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001334 # If the entry is still in status Starting, the process has not started
1335 # yet. Therefore, there is no need to parse and collect log. Without
1336 # this check, exception will be raised by scheduler as execution_subdir
1337 # for this queue entry does not have a value yet.
1338 hqe = self.queue_entries[0]
1339 if hqe.status != models.HostQueueEntry.Status.STARTING:
1340 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001341
1342
mbligh36768f02008-02-22 18:28:33 +00001343if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001344 main()