blob: ff377ffb33bba22e0523fbfd19cd102ef325c372 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
mbligh36768f02008-02-22 18:28:33 +00002"""
3Autotest scheduler
4"""
showard909c7a62008-07-15 21:52:38 +00005
Dan Shif6c65bd2014-08-29 16:15:07 -07006import datetime
7import gc
8import logging
9import optparse
10import os
11import signal
12import sys
13import time
showard402934a2009-12-21 22:20:47 +000014
Alex Miller05d7b4c2013-03-04 07:49:38 -080015import common
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
Dan Shiec1d47d2015-02-13 11:38:13 -080020from autotest_lib.client.common_lib import control_data
Prashanth B0e960282014-05-13 19:38:28 -070021from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070022from autotest_lib.client.common_lib import utils
Gabe Black1e1c41b2015-02-04 23:55:15 -080023from autotest_lib.client.common_lib.cros.graphite import autotest_stats
Prashanth B0e960282014-05-13 19:38:28 -070024from autotest_lib.frontend.afe import models, rpc_utils
Fang Dengc330bee2014-10-21 18:10:55 -070025from autotest_lib.scheduler import agent_task, drone_manager
beeps5e2bb4a2013-10-28 11:26:45 -070026from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
27from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070028from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070029from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070030from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000031from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080032from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070033from autotest_lib.server import autoserv_utils
Dan Shi114e1722016-01-10 18:12:53 -080034from autotest_lib.server import system_utils
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080035from autotest_lib.server import utils as server_utils
Dan Shicf2e8dd2015-05-07 17:18:48 -070036from autotest_lib.site_utils import metadata_reporter
Dan Shib9144a42014-12-01 16:09:32 -080037from autotest_lib.site_utils import server_manager_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080038
Dan Shicf2e8dd2015-05-07 17:18:48 -070039
showard549afad2009-08-20 23:33:36 +000040BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
41PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000042
mbligh36768f02008-02-22 18:28:33 +000043RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000044AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
45
46if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000047 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000048AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
49AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
50
51if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000052 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000053
showard35162b02009-03-03 02:17:30 +000054# error message to leave in results dir when an autoserv process disappears
55# mysteriously
56_LOST_PROCESS_ERROR = """\
57Autoserv failed abnormally during execution for this job, probably due to a
58system error on the Autotest server. Full results may not be available. Sorry.
59"""
60
Prashanth B0e960282014-05-13 19:38:28 -070061_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070062_db = None
mbligh36768f02008-02-22 18:28:33 +000063_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070064
65# These 2 globals are replaced for testing
66_autoserv_directory = autoserv_utils.autoserv_directory
67_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000068_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000069_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070070_inline_host_acquisition = global_config.global_config.get_config_value(
71 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
72 default=True)
73
Dan Shiec1d47d2015-02-13 11:38:13 -080074_enable_ssp_container = global_config.global_config.get_config_value(
75 'AUTOSERV', 'enable_ssp_container', type=bool,
76 default=True)
mbligh36768f02008-02-22 18:28:33 +000077
mbligh83c1e9e2009-05-01 23:10:41 +000078def _site_init_monitor_db_dummy():
79 return {}
80
81
jamesren76fcf192010-04-21 20:39:50 +000082def _verify_default_drone_set_exists():
83 if (models.DroneSet.drone_sets_enabled() and
84 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070085 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080086 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000087
88
89def _sanity_check():
90 """Make sure the configs are consistent before starting the scheduler"""
91 _verify_default_drone_set_exists()
92
93
mbligh36768f02008-02-22 18:28:33 +000094def main():
showard27f33872009-04-07 18:20:53 +000095 try:
showard549afad2009-08-20 23:33:36 +000096 try:
97 main_without_exception_handling()
98 except SystemExit:
99 raise
100 except:
101 logging.exception('Exception escaping in monitor_db')
102 raise
103 finally:
104 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000105
106
107def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700108 scheduler_lib.setup_logging(
109 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
110 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000111 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000112 parser = optparse.OptionParser(usage)
113 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
114 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000115 parser.add_option('--test', help='Indicate that scheduler is under ' +
116 'test and should use dummy autoserv and no parsing',
117 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700118 parser.add_option('--production',
119 help=('Indicate that scheduler is running in production '
120 'environment and it can use database that is not '
121 'hosted in localhost. If it is set to False, '
122 'scheduler will fail if database is not in '
123 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700124 action='store_true', default=False)
jadmanski0afbb632008-06-06 21:10:57 +0000125 (options, args) = parser.parse_args()
126 if len(args) != 1:
127 parser.print_usage()
128 return
mbligh36768f02008-02-22 18:28:33 +0000129
Dan Shif6c65bd2014-08-29 16:15:07 -0700130 scheduler_lib.check_production_settings(options)
131
showard5613c662009-06-08 23:30:33 +0000132 scheduler_enabled = global_config.global_config.get_config_value(
133 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
134
135 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800136 logging.error("Scheduler not enabled, set enable_scheduler to true in "
137 "the global_config's SCHEDULER section to enable it. "
138 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000139 sys.exit(1)
140
jadmanski0afbb632008-06-06 21:10:57 +0000141 global RESULTS_DIR
142 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000143
mbligh83c1e9e2009-05-01 23:10:41 +0000144 site_init = utils.import_site_function(__file__,
145 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
146 _site_init_monitor_db_dummy)
147 site_init()
148
showardcca334f2009-03-12 20:38:34 +0000149 # Change the cwd while running to avoid issues incase we were launched from
150 # somewhere odd (such as a random NFS home directory of the person running
151 # sudo to launch us as the appropriate user).
152 os.chdir(RESULTS_DIR)
153
jamesrenc7d387e2010-08-10 21:48:30 +0000154 # This is helpful for debugging why stuff a scheduler launches is
155 # misbehaving.
156 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000157
jadmanski0afbb632008-06-06 21:10:57 +0000158 if options.test:
159 global _autoserv_path
160 _autoserv_path = 'autoserv_dummy'
161 global _testing_mode
162 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000163
jamesrenc44ae992010-02-19 00:12:54 +0000164 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000165 server.start()
166
Dan Shicf2e8dd2015-05-07 17:18:48 -0700167 # Start the thread to report metadata.
168 metadata_reporter.start()
169
jadmanski0afbb632008-06-06 21:10:57 +0000170 try:
jamesrenc44ae992010-02-19 00:12:54 +0000171 initialize()
showardc5afc462009-01-13 00:09:39 +0000172 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000173 dispatcher.initialize(recover_hosts=options.recover_hosts)
Shuqian Zhaoce59fe52015-03-18 10:53:58 -0700174 minimum_tick_sec = global_config.global_config.get_config_value(
175 scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
showardc5afc462009-01-13 00:09:39 +0000176
Eric Lia82dc352011-02-23 13:15:52 -0800177 while not _shutdown and not server._shutdown_scheduler:
Shuqian Zhaoce59fe52015-03-18 10:53:58 -0700178 start = time.time()
jadmanski0afbb632008-06-06 21:10:57 +0000179 dispatcher.tick()
Shuqian Zhaoce59fe52015-03-18 10:53:58 -0700180 curr_tick_sec = time.time() - start
181 if (minimum_tick_sec > curr_tick_sec):
182 time.sleep(minimum_tick_sec - curr_tick_sec)
183 else:
184 time.sleep(0.0001)
Prashanth B4ec98672014-05-15 10:44:54 -0700185 except Exception:
showard170873e2009-01-07 00:22:26 +0000186 email_manager.manager.log_stacktrace(
187 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000188
Dan Shicf2e8dd2015-05-07 17:18:48 -0700189 metadata_reporter.abort()
showard170873e2009-01-07 00:22:26 +0000190 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000191 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000192 _drone_manager.shutdown()
Prashanth B0e960282014-05-13 19:38:28 -0700193 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000194
195
Prashanth B4ec98672014-05-15 10:44:54 -0700196def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000197 global _shutdown
198 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000199 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000200
201
jamesrenc44ae992010-02-19 00:12:54 +0000202def initialize():
showardb18134f2009-03-20 20:52:18 +0000203 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
204 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000205
showard8de37132009-08-31 18:33:08 +0000206 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000207 logging.critical("monitor_db already running, aborting!")
208 sys.exit(1)
209 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000210
showardb1e51872008-10-07 11:08:18 +0000211 if _testing_mode:
212 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700213 scheduler_lib.DB_CONFIG_SECTION, 'database',
214 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000215
Dan Shib9144a42014-12-01 16:09:32 -0800216 # If server database is enabled, check if the server has role `scheduler`.
217 # If the server does not have scheduler role, exception will be raised and
218 # scheduler will not continue to run.
219 if server_manager_utils.use_server_db():
220 server_manager_utils.confirm_server_has_role(hostname='localhost',
221 role='scheduler')
222
jadmanski0afbb632008-06-06 21:10:57 +0000223 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700224 global _db_manager
225 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700226 global _db
227 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000228 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700229 signal.signal(signal.SIGINT, handle_signal)
230 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000231
jamesrenc44ae992010-02-19 00:12:54 +0000232 initialize_globals()
233 scheduler_models.initialize()
234
Dan Shi114e1722016-01-10 18:12:53 -0800235 drone_list = system_utils.get_drones()
showard170873e2009-01-07 00:22:26 +0000236 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000237 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000238 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
239
showardb18134f2009-03-20 20:52:18 +0000240 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000241
242
jamesrenc44ae992010-02-19 00:12:54 +0000243def initialize_globals():
244 global _drone_manager
245 _drone_manager = drone_manager.instance()
246
247
showarded2afea2009-07-07 20:54:07 +0000248def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
249 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000250 """
251 @returns The autoserv command line as a list of executable + parameters.
252
253 @param machines - string - A machine or comma separated list of machines
254 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000255 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700256 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
257 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000258 @param queue_entry - A HostQueueEntry object - If supplied and no Job
259 object was supplied, this will be used to lookup the Job object.
260 """
Simran Basi1bf60eb2015-12-01 16:39:29 -0800261 command = autoserv_utils.autoserv_run_job_command(_autoserv_directory,
Aviv Keshet308e7362013-05-21 14:43:16 -0700262 machines, results_directory=drone_manager.WORKING_DIRECTORY,
263 extra_args=extra_args, job=job, queue_entry=queue_entry,
Simran Basi1bf60eb2015-12-01 16:39:29 -0800264 verbose=verbose, in_lab=True)
265 return command
showard87ba02a2009-04-20 19:37:32 +0000266
267
Simran Basia858a232012-08-21 11:04:37 -0700268class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800269
270
jadmanski0afbb632008-06-06 21:10:57 +0000271 def __init__(self):
272 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000273 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700274 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000275 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700276 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700277 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Dan Shic458f662015-04-29 12:12:38 -0700278 _db, _drone_manager)
showard170873e2009-01-07 00:22:26 +0000279 self._host_agents = {}
280 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000281 self._tick_count = 0
282 self._last_garbage_stats_time = time.time()
283 self._seconds_between_garbage_stats = 60 * (
284 global_config.global_config.get_config_value(
285 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700286 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700287 self._tick_debug = global_config.global_config.get_config_value(
288 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
289 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700290 self._extra_debugging = global_config.global_config.get_config_value(
291 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
292 default=False)
mbligh36768f02008-02-22 18:28:33 +0000293
Prashanth Bf66d51b2014-05-06 12:42:25 -0700294 # If _inline_host_acquisition is set the scheduler will acquire and
295 # release hosts against jobs inline, with the tick. Otherwise the
296 # scheduler will only focus on jobs that already have hosts, and
297 # will not explicitly unlease a host when a job finishes using it.
298 self._job_query_manager = query_managers.AFEJobQueryManager()
299 self._host_scheduler = (host_scheduler.BaseHostScheduler()
300 if _inline_host_acquisition else
301 host_scheduler.DummyHostScheduler())
302
mbligh36768f02008-02-22 18:28:33 +0000303
showard915958d2009-04-22 21:00:58 +0000304 def initialize(self, recover_hosts=True):
305 self._periodic_cleanup.initialize()
306 self._24hr_upkeep.initialize()
Dan Shi55d58992015-05-05 09:10:02 -0700307 # Execute all actions queued in the cleanup tasks. Scheduler tick will
308 # run a refresh task first. If there is any action in the queue, refresh
309 # will raise an exception.
310 _drone_manager.execute_actions()
showard915958d2009-04-22 21:00:58 +0000311
jadmanski0afbb632008-06-06 21:10:57 +0000312 # always recover processes
313 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000314
jadmanski0afbb632008-06-06 21:10:57 +0000315 if recover_hosts:
316 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000317
318
Simran Basi0ec94dd2012-08-28 09:50:10 -0700319 def _log_tick_msg(self, msg):
320 if self._tick_debug:
321 logging.debug(msg)
322
323
Simran Basidef92872012-09-20 13:34:34 -0700324 def _log_extra_msg(self, msg):
325 if self._extra_debugging:
326 logging.debug(msg)
327
328
jadmanski0afbb632008-06-06 21:10:57 +0000329 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700330 """
331 This is an altered version of tick() where we keep track of when each
332 major step begins so we can try to figure out where we are using most
333 of the tick time.
334 """
Gabe Black1e1c41b2015-02-04 23:55:15 -0800335 timer = autotest_stats.Timer('scheduler.tick')
Dan Shi114e1722016-01-10 18:12:53 -0800336 system_utils.DroneCache.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700337 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000338 self._garbage_collection()
Prashanth B340fd1e2014-06-22 12:44:10 -0700339 self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
340 _drone_manager.trigger_refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700341 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000342 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700343 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000344 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700345 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000346 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700347 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000348 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700349 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000350 self._schedule_new_jobs()
Prashanth B340fd1e2014-06-22 12:44:10 -0700351 self._log_tick_msg('Calling _drone_manager.sync_refresh().')
352 _drone_manager.sync_refresh()
Dan Shi55d58992015-05-05 09:10:02 -0700353 # _run_cleanup must be called between drone_manager.sync_refresh, and
354 # drone_manager.execute_actions, as sync_refresh will clear the calls
355 # queued in drones. Therefore, any action that calls drone.queue_call
356 # to add calls to the drone._calls, should be after drone refresh is
357 # completed and before drone_manager.execute_actions at the end of the
358 # tick.
359 self._log_tick_msg('Calling _run_cleanup().')
360 self._run_cleanup()
Prashanth B67548092014-07-11 18:46:01 -0700361 self._log_tick_msg('Calling _find_aborting().')
362 self._find_aborting()
363 self._log_tick_msg('Calling _find_aborted_special_tasks().')
364 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700365 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000366 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700367 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000368 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700369 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000370 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700371 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700372 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700373 with timer.get_client('email_manager_send_queued_emails'):
374 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700375 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700376 with timer.get_client('django_db_reset_queries'):
377 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000378 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000379
showard97aed502008-11-04 02:01:24 +0000380
mblighf3294cc2009-04-08 21:17:38 +0000381 def _run_cleanup(self):
382 self._periodic_cleanup.run_cleanup_maybe()
383 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000384
mbligh36768f02008-02-22 18:28:33 +0000385
showardf13a9e22009-12-18 22:54:09 +0000386 def _garbage_collection(self):
387 threshold_time = time.time() - self._seconds_between_garbage_stats
388 if threshold_time < self._last_garbage_stats_time:
389 # Don't generate these reports very often.
390 return
391
392 self._last_garbage_stats_time = time.time()
393 # Force a full level 0 collection (because we can, it doesn't hurt
394 # at this interval).
395 gc.collect()
396 logging.info('Logging garbage collector stats on tick %d.',
397 self._tick_count)
398 gc_stats._log_garbage_collector_stats()
399
400
showard170873e2009-01-07 00:22:26 +0000401 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
402 for object_id in object_ids:
403 agent_dict.setdefault(object_id, set()).add(agent)
404
405
406 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
407 for object_id in object_ids:
408 assert object_id in agent_dict
409 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700410 # If an ID has no more active agent associated, there is no need to
411 # keep it in the dictionary. Otherwise, scheduler will keep an
412 # unnecessarily big dictionary until being restarted.
413 if not agent_dict[object_id]:
414 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000415
416
showardd1195652009-12-08 22:21:02 +0000417 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700418 """
419 Creates and adds an agent to the dispatchers list.
420
421 In creating the agent we also pass on all the queue_entry_ids and
422 host_ids from the special agent task. For every agent we create, we
423 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
424 against the host_ids given to it. So theoritically, a host can have any
425 number of agents associated with it, and each of them can have any
426 special agent task, though in practice we never see > 1 agent/task per
427 host at any time.
428
429 @param agent_task: A SpecialTask for the agent to manage.
430 """
showardd1195652009-12-08 22:21:02 +0000431 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000432 self._agents.append(agent)
433 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000434 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
435 self._register_agent_for_ids(self._queue_entry_agents,
436 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000437
showard170873e2009-01-07 00:22:26 +0000438
439 def get_agents_for_entry(self, queue_entry):
440 """
441 Find agents corresponding to the specified queue_entry.
442 """
showardd3dc1992009-04-22 21:01:40 +0000443 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000444
445
446 def host_has_agent(self, host):
447 """
448 Determine if there is currently an Agent present using this host.
449 """
450 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000451
452
jadmanski0afbb632008-06-06 21:10:57 +0000453 def remove_agent(self, agent):
454 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000455 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
456 agent)
457 self._unregister_agent_for_ids(self._queue_entry_agents,
458 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000459
460
showard8cc058f2009-09-08 16:26:33 +0000461 def _host_has_scheduled_special_task(self, host):
462 return bool(models.SpecialTask.objects.filter(host__id=host.id,
463 is_active=False,
464 is_complete=False))
465
466
jadmanski0afbb632008-06-06 21:10:57 +0000467 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000468 agent_tasks = self._create_recovery_agent_tasks()
469 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000470 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000471 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000472 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000473 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000474 self._reverify_remaining_hosts()
475 # reinitialize drones after killing orphaned processes, since they can
476 # leave around files when they die
477 _drone_manager.execute_actions()
478 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000479
showard170873e2009-01-07 00:22:26 +0000480
showardd1195652009-12-08 22:21:02 +0000481 def _create_recovery_agent_tasks(self):
482 return (self._get_queue_entry_agent_tasks()
483 + self._get_special_task_agent_tasks(is_active=True))
484
485
486 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700487 """
488 Get agent tasks for all hqe in the specified states.
489
490 Loosely this translates to taking a hqe in one of the specified states,
491 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
492 through _get_agent_task_for_queue_entry. Each queue entry can only have
493 one agent task at a time, but there might be multiple queue entries in
494 the group.
495
496 @return: A list of AgentTasks.
497 """
showardd1195652009-12-08 22:21:02 +0000498 # host queue entry statuses handled directly by AgentTasks (Verifying is
499 # handled through SpecialTasks, so is not listed here)
500 statuses = (models.HostQueueEntry.Status.STARTING,
501 models.HostQueueEntry.Status.RUNNING,
502 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000503 models.HostQueueEntry.Status.PARSING,
504 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000505 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000506 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000507 where='status IN (%s)' % status_list)
Gabe Black1e1c41b2015-02-04 23:55:15 -0800508 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Alex Miller47cd2472013-11-25 15:20:04 -0800509 'running', len(queue_entries))
showardd1195652009-12-08 22:21:02 +0000510
511 agent_tasks = []
512 used_queue_entries = set()
513 for entry in queue_entries:
514 if self.get_agents_for_entry(entry):
515 # already being handled
516 continue
517 if entry in used_queue_entries:
518 # already picked up by a synchronous job
519 continue
520 agent_task = self._get_agent_task_for_queue_entry(entry)
521 agent_tasks.append(agent_task)
522 used_queue_entries.update(agent_task.queue_entries)
523 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000524
525
showardd1195652009-12-08 22:21:02 +0000526 def _get_special_task_agent_tasks(self, is_active=False):
527 special_tasks = models.SpecialTask.objects.filter(
528 is_active=is_active, is_complete=False)
529 return [self._get_agent_task_for_special_task(task)
530 for task in special_tasks]
531
532
533 def _get_agent_task_for_queue_entry(self, queue_entry):
534 """
beeps8bb1f7d2013-08-05 01:30:09 -0700535 Construct an AgentTask instance for the given active HostQueueEntry.
536
showardd1195652009-12-08 22:21:02 +0000537 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700538 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000539 """
540 task_entries = queue_entry.job.get_group_entries(queue_entry)
541 self._check_for_duplicate_host_entries(task_entries)
542
543 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
544 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000545 if queue_entry.is_hostless():
546 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000547 return QueueTask(queue_entries=task_entries)
548 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700549 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000550 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700551 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000552 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700553 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000554
Prashanth B0e960282014-05-13 19:38:28 -0700555 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800556 '_get_agent_task_for_queue_entry got entry with '
557 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000558
559
560 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000561 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
562 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000563 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000564 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000565 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000566 if using_host:
showardd1195652009-12-08 22:21:02 +0000567 self._assert_host_has_no_agent(task_entry)
568
569
570 def _assert_host_has_no_agent(self, entry):
571 """
572 @param entry: a HostQueueEntry or a SpecialTask
573 """
574 if self.host_has_agent(entry.host):
575 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700576 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000577 'While scheduling %s, host %s already has a host agent %s'
578 % (entry, entry.host, agent.task))
579
580
581 def _get_agent_task_for_special_task(self, special_task):
582 """
583 Construct an AgentTask class to run the given SpecialTask and add it
584 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700585
MK Ryu35d661e2014-09-25 17:44:10 -0700586 A special task is created through schedule_special_tasks, but only if
beeps8bb1f7d2013-08-05 01:30:09 -0700587 the host doesn't already have an agent. This happens through
588 add_agent_task. All special agent tasks are given a host on creation,
589 and a Null hqe. To create a SpecialAgentTask object, you need a
590 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
591 object contains a hqe it's passed on to the special agent task, which
592 creates a HostQueueEntry and saves it as it's queue_entry.
593
showardd1195652009-12-08 22:21:02 +0000594 @param special_task: a models.SpecialTask instance
595 @returns an AgentTask to run this SpecialTask
596 """
597 self._assert_host_has_no_agent(special_task)
598
beeps5e2bb4a2013-10-28 11:26:45 -0700599 special_agent_task_classes = (prejob_task.CleanupTask,
600 prejob_task.VerifyTask,
601 prejob_task.RepairTask,
602 prejob_task.ResetTask,
603 prejob_task.ProvisionTask)
604
showardd1195652009-12-08 22:21:02 +0000605 for agent_task_class in special_agent_task_classes:
606 if agent_task_class.TASK_TYPE == special_task.task:
607 return agent_task_class(task=special_task)
608
Prashanth B0e960282014-05-13 19:38:28 -0700609 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800610 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000611
612
613 def _register_pidfiles(self, agent_tasks):
614 for agent_task in agent_tasks:
615 agent_task.register_necessary_pidfiles()
616
617
618 def _recover_tasks(self, agent_tasks):
619 orphans = _drone_manager.get_orphaned_autoserv_processes()
620
621 for agent_task in agent_tasks:
622 agent_task.recover()
623 if agent_task.monitor and agent_task.monitor.has_process():
624 orphans.discard(agent_task.monitor.get_process())
625 self.add_agent_task(agent_task)
626
627 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000628
629
showard8cc058f2009-09-08 16:26:33 +0000630 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000631 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
632 % status):
showard0db3d432009-10-12 20:29:15 +0000633 if entry.status == status and not self.get_agents_for_entry(entry):
634 # The status can change during iteration, e.g., if job.run()
635 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000636 yield entry
637
638
showard6878e8b2009-07-20 22:37:45 +0000639 def _check_for_remaining_orphan_processes(self, orphans):
640 if not orphans:
641 return
642 subject = 'Unrecovered orphan autoserv processes remain'
643 message = '\n'.join(str(process) for process in orphans)
644 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000645
646 die_on_orphans = global_config.global_config.get_config_value(
647 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
648
649 if die_on_orphans:
650 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000651
showard170873e2009-01-07 00:22:26 +0000652
showard8cc058f2009-09-08 16:26:33 +0000653 def _recover_pending_entries(self):
654 for entry in self._get_unassigned_entries(
655 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000656 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000657 entry.on_pending()
658
659
showardb8900452009-10-12 20:31:01 +0000660 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000661 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000662 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
663 unrecovered_hqes = []
664 for queue_entry in queue_entries:
665 special_tasks = models.SpecialTask.objects.filter(
666 task__in=(models.SpecialTask.Task.CLEANUP,
667 models.SpecialTask.Task.VERIFY),
668 queue_entry__id=queue_entry.id,
669 is_complete=False)
670 if special_tasks.count() == 0:
671 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000672
showardb8900452009-10-12 20:31:01 +0000673 if unrecovered_hqes:
674 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700675 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000676 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000677 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000678
679
showard65db3932009-10-28 19:54:35 +0000680 def _schedule_special_tasks(self):
681 """
682 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700683
684 Special tasks include PreJobTasks like verify, reset and cleanup.
685 They are created through _schedule_new_jobs and associated with a hqe
686 This method translates SpecialTasks to the appropriate AgentTask and
687 adds them to the dispatchers agents list, so _handle_agents can execute
688 them.
showard65db3932009-10-28 19:54:35 +0000689 """
Prashanth B4ec98672014-05-15 10:44:54 -0700690 # When the host scheduler is responsible for acquisition we only want
691 # to run tasks with leased hosts. All hqe tasks will already have
692 # leased hosts, and we don't want to run frontend tasks till the host
693 # scheduler has vetted the assignment. Note that this doesn't include
694 # frontend tasks with hosts leased by other active hqes.
695 for task in self._job_query_manager.get_prioritized_special_tasks(
696 only_tasks_with_leased_hosts=not _inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000697 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000698 continue
showardd1195652009-12-08 22:21:02 +0000699 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000700
701
showard170873e2009-01-07 00:22:26 +0000702 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000703 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000704 # should never happen
showarded2afea2009-07-07 20:54:07 +0000705 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000706 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000707 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700708 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000709 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000710
711
jadmanski0afbb632008-06-06 21:10:57 +0000712 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000713 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700714 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000715 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000716 if self.host_has_agent(host):
717 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000718 continue
showard8cc058f2009-09-08 16:26:33 +0000719 if self._host_has_scheduled_special_task(host):
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700720 # host will have a special task scheduled on the next tick
showard8cc058f2009-09-08 16:26:33 +0000721 continue
showard170873e2009-01-07 00:22:26 +0000722 if print_message:
showardb18134f2009-03-20 20:52:18 +0000723 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000724 models.SpecialTask.objects.create(
725 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000726 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000727
728
jadmanski0afbb632008-06-06 21:10:57 +0000729 def _recover_hosts(self):
730 # recover "Repair Failed" hosts
731 message = 'Reverifying dead host %s'
732 self._reverify_hosts_where("status = 'Repair Failed'",
733 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000734
735
showard89f84db2009-03-12 20:39:13 +0000736 def _refresh_pending_queue_entries(self):
737 """
738 Lookup the pending HostQueueEntries and call our HostScheduler
739 refresh() method given that list. Return the list.
740
741 @returns A list of pending HostQueueEntries sorted in priority order.
742 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700743 queue_entries = self._job_query_manager.get_pending_queue_entries(
744 only_hostless=not _inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000745 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000746 return []
showard89f84db2009-03-12 20:39:13 +0000747 return queue_entries
748
749
showarda9545c02009-12-18 22:44:26 +0000750 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800751 """Schedule a hostless (suite) job.
752
753 @param queue_entry: The queue_entry representing the hostless job.
754 """
showarda9545c02009-12-18 22:44:26 +0000755 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700756
757 # Need to set execution_subdir before setting the status:
758 # After a restart of the scheduler, agents will be restored for HQEs in
759 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
760 # execution_subdir is needed. Therefore it must be set before entering
761 # one of these states.
762 # Otherwise, if the scheduler was interrupted between setting the status
763 # and the execution_subdir, upon it's restart restoring agents would
764 # fail.
765 # Is there a way to get a status in one of these states without going
766 # through this code? Following cases are possible:
767 # - If it's aborted before being started:
768 # active bit will be 0, so there's nothing to parse, it will just be
769 # set to completed by _find_aborting. Critical statuses are skipped.
770 # - If it's aborted or it fails after being started:
771 # It was started, so this code was executed.
772 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000773 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000774
775
beepscc9fc702013-12-02 12:45:38 -0800776 def _schedule_host_job(self, host, queue_entry):
777 """Schedules a job on the given host.
778
779 1. Assign the host to the hqe, if it isn't already assigned.
780 2. Create a SpecialAgentTask for the hqe.
781 3. Activate the hqe.
782
783 @param queue_entry: The job to schedule.
784 @param host: The host to schedule the job on.
785 """
786 if self.host_has_agent(host):
787 host_agent_task = list(self._host_agents.get(host.id))[0].task
788 subject = 'Host with agents assigned to an HQE'
789 message = ('HQE: %s assigned host %s, but the host has '
790 'agent: %s for queue_entry %s. The HQE '
beepsf7d35162014-02-13 16:42:13 -0800791 'will have to try and acquire a host next tick ' %
beepscc9fc702013-12-02 12:45:38 -0800792 (queue_entry, host.hostname, host_agent_task,
793 host_agent_task.queue_entry))
794 email_manager.manager.enqueue_notify_email(subject, message)
beepscc9fc702013-12-02 12:45:38 -0800795 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700796 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800797
798
showard89f84db2009-03-12 20:39:13 +0000799 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700800 """
801 Find any new HQEs and call schedule_pre_job_tasks for it.
802
803 This involves setting the status of the HQE and creating a row in the
804 db corresponding the the special task, through
805 scheduler_models._queue_special_task. The new db row is then added as
806 an agent to the dispatcher through _schedule_special_tasks and
807 scheduled for execution on the drone through _handle_agents.
808 """
showard89f84db2009-03-12 20:39:13 +0000809 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000810
beepscc9fc702013-12-02 12:45:38 -0800811 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700812 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700813 new_jobs_with_hosts = 0
814 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800815 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700816 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000817
beepscc9fc702013-12-02 12:45:38 -0800818 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000819 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000820 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700821 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000822 else:
beepscc9fc702013-12-02 12:45:38 -0800823 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700824 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700825
Gabe Black1e1c41b2015-02-04 23:55:15 -0800826 autotest_stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800827 if not host_jobs:
828 return
Prashanth Bf66d51b2014-05-06 12:42:25 -0700829 if not _inline_host_acquisition:
830 message = ('Found %s jobs that need hosts though '
831 '_inline_host_acquisition=%s. Will acquire hosts.' %
832 ([str(job) for job in host_jobs],
833 _inline_host_acquisition))
834 email_manager.manager.enqueue_notify_email(
835 'Processing unexpected host acquisition requests', message)
836 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
837 for host_assignment in jobs_with_hosts:
838 self._schedule_host_job(host_assignment.host, host_assignment.job)
839 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800840
Gabe Black1e1c41b2015-02-04 23:55:15 -0800841 autotest_stats.Gauge(key).send('new_jobs_with_hosts',
842 new_jobs_with_hosts)
843 autotest_stats.Gauge(key).send('new_jobs_without_hosts',
844 new_jobs_need_hosts -
845 new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000846
847
showard8cc058f2009-09-08 16:26:33 +0000848 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700849 """
850 Adds agents to the dispatcher.
851
852 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
853 QueueTask for example, will have a job with a control file, and
854 the agent will have methods that poll, abort and check if the queue
855 task is finished. The dispatcher runs the agent_task, as well as
856 other agents in it's _agents member, through _handle_agents, by
857 calling the Agents tick().
858
859 This method creates an agent for each HQE in one of (starting, running,
860 gathering, parsing, archiving) states, and adds it to the dispatcher so
861 it is handled by _handle_agents.
862 """
showardd1195652009-12-08 22:21:02 +0000863 for agent_task in self._get_queue_entry_agent_tasks():
864 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000865
866
867 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000868 for entry in scheduler_models.HostQueueEntry.fetch(
869 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000870 task = entry.job.schedule_delayed_callback_task(entry)
871 if task:
showardd1195652009-12-08 22:21:02 +0000872 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000873
874
jadmanski0afbb632008-06-06 21:10:57 +0000875 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700876 """
877 Looks through the afe_host_queue_entries for an aborted entry.
878
879 The aborted bit is set on an HQE in many ways, the most common
880 being when a user requests an abort through the frontend, which
881 results in an rpc from the afe to abort_host_queue_entries.
882 """
jamesrene7c65cb2010-06-08 20:38:10 +0000883 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000884 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700885 where='aborted=1 and complete=0'):
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800886
887 # If the job is running on a shard, let the shard handle aborting
888 # it and sync back the right status.
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800889 if entry.job.shard_id is not None and not server_utils.is_shard():
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800890 logging.info('Waiting for shard %s to abort hqe %s',
891 entry.job.shard_id, entry)
892 continue
893
showardf4a2e502009-07-28 20:06:39 +0000894 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800895
896 # The task would have started off with both is_complete and
897 # is_active = False. Aborted tasks are neither active nor complete.
898 # For all currently active tasks this will happen through the agent,
899 # but we need to manually update the special tasks that haven't
900 # started yet, because they don't have agents.
901 models.SpecialTask.objects.filter(is_active=False,
902 queue_entry_id=entry.id).update(is_complete=True)
903
showardd3dc1992009-04-22 21:01:40 +0000904 for agent in self.get_agents_for_entry(entry):
905 agent.abort()
906 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000907 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700908 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000909 for job in jobs_to_stop:
910 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000911
912
beeps8bb1f7d2013-08-05 01:30:09 -0700913 def _find_aborted_special_tasks(self):
914 """
915 Find SpecialTasks that have been marked for abortion.
916
917 Poll the database looking for SpecialTasks that are active
918 and have been marked for abortion, then abort them.
919 """
920
921 # The completed and active bits are very important when it comes
922 # to scheduler correctness. The active bit is set through the prolog
923 # of a special task, and reset through the cleanup method of the
924 # SpecialAgentTask. The cleanup is called both through the abort and
925 # epilog. The complete bit is set in several places, and in general
926 # a hanging job will have is_active=1 is_complete=0, while a special
927 # task which completed will have is_active=0 is_complete=1. To check
928 # aborts we directly check active because the complete bit is set in
929 # several places, including the epilog of agent tasks.
930 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
931 is_aborted=True)
932 for task in aborted_tasks:
933 # There are 2 ways to get the agent associated with a task,
934 # through the host and through the hqe. A special task
935 # always needs a host, but doesn't always need a hqe.
936 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700937 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000938
beeps8bb1f7d2013-08-05 01:30:09 -0700939 # The epilog preforms critical actions such as
940 # queueing the next SpecialTask, requeuing the
941 # hqe etc, however it doesn't actually kill the
942 # monitor process and set the 'done' bit. Epilogs
943 # assume that the job failed, and that the monitor
944 # process has already written an exit code. The
945 # done bit is a necessary condition for
946 # _handle_agents to schedule any more special
947 # tasks against the host, and it must be set
948 # in addition to is_active, is_complete and success.
949 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000950 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700951
952
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700953 def _can_start_agent(self, agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000954 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000955 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000956 return True
957 # don't allow any nonzero-process agents to run after we've reached a
958 # limit (this avoids starvation of many-process agents)
959 if have_reached_limit:
960 return False
961 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000962 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000963 agent.task.owner_username,
964 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000965 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000966 return False
showard4c5374f2008-09-04 17:02:56 +0000967 return True
968
969
jadmanski0afbb632008-06-06 21:10:57 +0000970 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700971 """
972 Handles agents of the dispatcher.
973
974 Appropriate Agents are added to the dispatcher through
975 _schedule_running_host_queue_entries. These agents each
976 have a task. This method runs the agents task through
977 agent.tick() leading to:
978 agent.start
979 prolog -> AgentTasks prolog
980 For each queue entry:
981 sets host status/status to Running
982 set started_on in afe_host_queue_entries
983 run -> AgentTasks run
984 Creates PidfileRunMonitor
985 Queues the autoserv command line for this AgentTask
986 via the drone manager. These commands are executed
987 through the drone managers execute actions.
988 poll -> AgentTasks/BaseAgentTask poll
989 checks the monitors exit_code.
990 Executes epilog if task is finished.
991 Executes AgentTasks _finish_task
992 finish_task is usually responsible for setting the status
993 of the HQE/host, and updating it's active and complete fileds.
994
995 agent.is_done
996 Removed the agent from the dispatchers _agents queue.
997 Is_done checks the finished bit on the agent, that is
998 set based on the Agents task. During the agents poll
999 we check to see if the monitor process has exited in
1000 it's finish method, and set the success member of the
1001 task based on this exit code.
1002 """
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001003 num_started_this_tick = 0
1004 num_finished_this_tick = 0
showard4c5374f2008-09-04 17:02:56 +00001005 have_reached_limit = False
1006 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -07001007 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +00001008 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001009 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1010 'queue_entry ids:%s' % (agent.host_ids,
1011 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001012 if not agent.started:
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001013 if not self._can_start_agent(agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001014 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001015 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001016 continue
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001017 num_started_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001018 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001019 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001020 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001021 if agent.is_done():
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001022 num_finished_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001023 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001024 self.remove_agent(agent)
Gabe Black1e1c41b2015-02-04 23:55:15 -08001025 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001026 'agents_started', num_started_this_tick)
Gabe Black1e1c41b2015-02-04 23:55:15 -08001027 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001028 'agents_finished', num_finished_this_tick)
1029 logging.info('%d running processes. %d added this tick.',
Simran Basi3f6717d2012-09-13 15:21:22 -07001030 _drone_manager.total_running_processes(),
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001031 num_started_this_tick)
mbligh36768f02008-02-22 18:28:33 +00001032
1033
showard29f7cd22009-04-29 21:16:24 +00001034 def _process_recurring_runs(self):
1035 recurring_runs = models.RecurringRun.objects.filter(
1036 start_date__lte=datetime.datetime.now())
1037 for rrun in recurring_runs:
1038 # Create job from template
1039 job = rrun.job
1040 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001041 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001042
1043 host_objects = info['hosts']
1044 one_time_hosts = info['one_time_hosts']
1045 metahost_objects = info['meta_hosts']
1046 dependencies = info['dependencies']
1047 atomic_group = info['atomic_group']
1048
1049 for host in one_time_hosts or []:
1050 this_host = models.Host.create_one_time_host(host.hostname)
1051 host_objects.append(this_host)
1052
1053 try:
1054 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001055 options=options,
showard29f7cd22009-04-29 21:16:24 +00001056 host_objects=host_objects,
1057 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001058 atomic_group=atomic_group)
1059
1060 except Exception, ex:
1061 logging.exception(ex)
1062 #TODO send email
1063
1064 if rrun.loop_count == 1:
1065 rrun.delete()
1066 else:
1067 if rrun.loop_count != 0: # if not infinite loop
1068 # calculate new start_date
1069 difference = datetime.timedelta(seconds=rrun.loop_period)
1070 rrun.start_date = rrun.start_date + difference
1071 rrun.loop_count -= 1
1072 rrun.save()
1073
1074
Simran Basia858a232012-08-21 11:04:37 -07001075SiteDispatcher = utils.import_site_class(
1076 __file__, 'autotest_lib.scheduler.site_monitor_db',
1077 'SiteDispatcher', BaseDispatcher)
1078
1079class Dispatcher(SiteDispatcher):
1080 pass
1081
1082
mbligh36768f02008-02-22 18:28:33 +00001083class Agent(object):
showard77182562009-06-10 00:16:05 +00001084 """
Alex Miller47715eb2013-07-24 03:34:01 -07001085 An agent for use by the Dispatcher class to perform a task. An agent wraps
1086 around an AgentTask mainly to associate the AgentTask with the queue_entry
1087 and host ids.
showard77182562009-06-10 00:16:05 +00001088
1089 The following methods are required on all task objects:
1090 poll() - Called periodically to let the task check its status and
1091 update its internal state. If the task succeeded.
1092 is_done() - Returns True if the task is finished.
1093 abort() - Called when an abort has been requested. The task must
1094 set its aborted attribute to True if it actually aborted.
1095
1096 The following attributes are required on all task objects:
1097 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001098 success - bool, True if this task succeeded.
1099 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1100 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001101 """
1102
1103
showard418785b2009-11-23 20:19:59 +00001104 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001105 """
Alex Miller47715eb2013-07-24 03:34:01 -07001106 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001107 """
showard8cc058f2009-09-08 16:26:33 +00001108 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001109
showard77182562009-06-10 00:16:05 +00001110 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001111 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001112
showard8cc058f2009-09-08 16:26:33 +00001113 self.queue_entry_ids = task.queue_entry_ids
1114 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001115
showard8cc058f2009-09-08 16:26:33 +00001116 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001117 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001118
1119
jadmanski0afbb632008-06-06 21:10:57 +00001120 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001121 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001122 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001123 self.task.poll()
1124 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001125 self.finished = True
showardec113162008-05-08 00:52:49 +00001126
1127
jadmanski0afbb632008-06-06 21:10:57 +00001128 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001129 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001130
1131
showardd3dc1992009-04-22 21:01:40 +00001132 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001133 if self.task:
1134 self.task.abort()
1135 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001136 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001137 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001138
showardd3dc1992009-04-22 21:01:40 +00001139
beeps5e2bb4a2013-10-28 11:26:45 -07001140class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001141 """
1142 Common functionality for QueueTask and HostlessQueueTask
1143 """
1144 def __init__(self, queue_entries):
1145 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001146 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001147 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001148
1149
showard73ec0442009-02-07 02:05:20 +00001150 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001151 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001152
1153
jamesrenc44ae992010-02-19 00:12:54 +00001154 def _write_control_file(self, execution_path):
1155 control_path = _drone_manager.attach_file_to_execution(
1156 execution_path, self.job.control_file)
1157 return control_path
1158
1159
Aviv Keshet308e7362013-05-21 14:43:16 -07001160 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001161 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001162 execution_path = self.queue_entries[0].execution_path()
1163 control_path = self._write_control_file(execution_path)
1164 hostnames = ','.join(entry.host.hostname
1165 for entry in self.queue_entries
1166 if not entry.is_hostless())
1167
1168 execution_tag = self.queue_entries[0].execution_tag()
1169 params = _autoserv_command_line(
1170 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001171 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001172 _drone_manager.absolute_path(control_path)],
1173 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001174 if self.job.is_image_update_job():
1175 params += ['--image', self.job.update_image_path]
1176
jamesrenc44ae992010-02-19 00:12:54 +00001177 return params
showardd1195652009-12-08 22:21:02 +00001178
1179
1180 @property
1181 def num_processes(self):
1182 return len(self.queue_entries)
1183
1184
1185 @property
1186 def owner_username(self):
1187 return self.job.owner
1188
1189
1190 def _working_directory(self):
1191 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001192
1193
jadmanski0afbb632008-06-06 21:10:57 +00001194 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001195 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001196 keyval_dict = self.job.keyval_dict()
1197 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001198 group_name = self.queue_entries[0].get_group_name()
1199 if group_name:
1200 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001201 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001202 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001203 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001204 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001205
1206
showard35162b02009-03-03 02:17:30 +00001207 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001208 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001209 _drone_manager.write_lines_to_file(error_file_path,
1210 [_LOST_PROCESS_ERROR])
1211
1212
showardd3dc1992009-04-22 21:01:40 +00001213 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001214 if not self.monitor:
1215 return
1216
showardd9205182009-04-27 20:09:55 +00001217 self._write_job_finished()
1218
showard35162b02009-03-03 02:17:30 +00001219 if self.monitor.lost_process:
1220 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001221
jadmanskif7fa2cc2008-10-01 14:13:23 +00001222
showardcbd74612008-11-19 21:42:02 +00001223 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001224 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001225 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001226 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001227 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001228
1229
jadmanskif7fa2cc2008-10-01 14:13:23 +00001230 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001231 if not self.monitor or not self.monitor.has_process():
1232 return
1233
jadmanskif7fa2cc2008-10-01 14:13:23 +00001234 # build up sets of all the aborted_by and aborted_on values
1235 aborted_by, aborted_on = set(), set()
1236 for queue_entry in self.queue_entries:
1237 if queue_entry.aborted_by:
1238 aborted_by.add(queue_entry.aborted_by)
1239 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1240 aborted_on.add(t)
1241
1242 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001243 # TODO(showard): this conditional is now obsolete, we just need to leave
1244 # it in temporarily for backwards compatibility over upgrades. delete
1245 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001246 assert len(aborted_by) <= 1
1247 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001248 aborted_by_value = aborted_by.pop()
1249 aborted_on_value = max(aborted_on)
1250 else:
1251 aborted_by_value = 'autotest_system'
1252 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001253
showarda0382352009-02-11 23:36:43 +00001254 self._write_keyval_after_job("aborted_by", aborted_by_value)
1255 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001256
showardcbd74612008-11-19 21:42:02 +00001257 aborted_on_string = str(datetime.datetime.fromtimestamp(
1258 aborted_on_value))
1259 self._write_status_comment('Job aborted by %s on %s' %
1260 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001261
1262
jadmanski0afbb632008-06-06 21:10:57 +00001263 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001264 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001265 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001266 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001267
1268
jadmanski0afbb632008-06-06 21:10:57 +00001269 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001270 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001271 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001272
1273
1274class QueueTask(AbstractQueueTask):
1275 def __init__(self, queue_entries):
1276 super(QueueTask, self).__init__(queue_entries)
1277 self._set_ids(queue_entries=queue_entries)
1278
1279
1280 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001281 self._check_queue_entry_statuses(
1282 self.queue_entries,
1283 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1284 models.HostQueueEntry.Status.RUNNING),
1285 allowed_host_statuses=(models.Host.Status.PENDING,
1286 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001287
1288 super(QueueTask, self).prolog()
1289
1290 for queue_entry in self.queue_entries:
1291 self._write_host_keyvals(queue_entry.host)
1292 queue_entry.host.set_status(models.Host.Status.RUNNING)
1293 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001294
1295
1296 def _finish_task(self):
1297 super(QueueTask, self)._finish_task()
1298
1299 for queue_entry in self.queue_entries:
1300 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001301 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001302
1303
Alex Miller9f01d5d2013-08-08 02:26:01 -07001304 def _command_line(self):
Dan Shi36cfd832014-10-10 13:38:51 -07001305 invocation = super(QueueTask, self)._command_line()
1306 # Check if server-side packaging is needed.
1307 if (_enable_ssp_container and
1308 self.job.control_type == control_data.CONTROL_TYPE.SERVER and
1309 self.job.require_ssp != False):
Dan Shiec1d47d2015-02-13 11:38:13 -08001310 invocation += ['--require-ssp']
Dan Shi36cfd832014-10-10 13:38:51 -07001311 keyval_dict = self.job.keyval_dict()
1312 test_source_build = keyval_dict.get('test_source_build', None)
1313 if test_source_build:
1314 invocation += ['--test_source_build', test_source_build]
Dan Shi70647ca2015-07-16 22:52:35 -07001315 if self.job.parent_job_id:
1316 invocation += ['--parent_job_id', str(self.job.parent_job_id)]
Dan Shi36cfd832014-10-10 13:38:51 -07001317 return invocation + ['--verify_job_repo_url']
Alex Miller9f01d5d2013-08-08 02:26:01 -07001318
1319
Dan Shi1a189052013-10-28 14:41:35 -07001320class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001321 def __init__(self, queue_entry):
1322 super(HostlessQueueTask, self).__init__([queue_entry])
1323 self.queue_entry_ids = [queue_entry.id]
1324
1325
1326 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001327 super(HostlessQueueTask, self).prolog()
1328
1329
mbligh4608b002010-01-05 18:22:35 +00001330 def _finish_task(self):
1331 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001332
1333 # When a job is added to database, its initial status is always
1334 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1335 # status, check if any of them can be started. If scheduler hits some
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001336 # limit, e.g., max_hostless_jobs_per_drone, scheduler will
1337 # leave these jobs in Starting status. Otherwise, the jobs'
1338 # status will be changed to Running, and an autoserv process
1339 # will be started in drone for each of these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001340 # If the entry is still in status Starting, the process has not started
1341 # yet. Therefore, there is no need to parse and collect log. Without
1342 # this check, exception will be raised by scheduler as execution_subdir
1343 # for this queue entry does not have a value yet.
1344 hqe = self.queue_entries[0]
1345 if hqe.status != models.HostQueueEntry.Status.STARTING:
1346 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001347
1348
mbligh36768f02008-02-22 18:28:33 +00001349if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001350 main()