blob: d520a404f219881392a8c6eda977091643feda06 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
Richard Barnetteffed1722016-05-18 15:57:22 -07002
3#pylint: disable=C0111
4
mbligh36768f02008-02-22 18:28:33 +00005"""
6Autotest scheduler
7"""
showard909c7a62008-07-15 21:52:38 +00008
Dan Shif6c65bd2014-08-29 16:15:07 -07009import datetime
10import gc
11import logging
12import optparse
13import os
14import signal
15import sys
16import time
showard402934a2009-12-21 22:20:47 +000017
Alex Miller05d7b4c2013-03-04 07:49:38 -080018import common
showard21baa452008-10-21 00:08:39 +000019from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000020
21import django.db
Richard Barnetteffed1722016-05-18 15:57:22 -070022from chromite.lib import ts_mon_config
showard402934a2009-12-21 22:20:47 +000023
Dan Shiec1d47d2015-02-13 11:38:13 -080024from autotest_lib.client.common_lib import control_data
Prashanth B0e960282014-05-13 19:38:28 -070025from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070026from autotest_lib.client.common_lib import utils
Gabe Black1e1c41b2015-02-04 23:55:15 -080027from autotest_lib.client.common_lib.cros.graphite import autotest_stats
Prashanth B0e960282014-05-13 19:38:28 -070028from autotest_lib.frontend.afe import models, rpc_utils
Fang Dengc330bee2014-10-21 18:10:55 -070029from autotest_lib.scheduler import agent_task, drone_manager
beeps5e2bb4a2013-10-28 11:26:45 -070030from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
31from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070032from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070033from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070034from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000035from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080036from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070037from autotest_lib.server import autoserv_utils
Dan Shi114e1722016-01-10 18:12:53 -080038from autotest_lib.server import system_utils
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080039from autotest_lib.server import utils as server_utils
Dan Shicf2e8dd2015-05-07 17:18:48 -070040from autotest_lib.site_utils import metadata_reporter
Dan Shib9144a42014-12-01 16:09:32 -080041from autotest_lib.site_utils import server_manager_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080042
Dan Shicf2e8dd2015-05-07 17:18:48 -070043
showard549afad2009-08-20 23:33:36 +000044BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
45PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000046
mbligh36768f02008-02-22 18:28:33 +000047RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000048AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
49
50if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000051 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000052AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
53AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
54
55if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000056 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000057
showard35162b02009-03-03 02:17:30 +000058# error message to leave in results dir when an autoserv process disappears
59# mysteriously
60_LOST_PROCESS_ERROR = """\
61Autoserv failed abnormally during execution for this job, probably due to a
62system error on the Autotest server. Full results may not be available. Sorry.
63"""
64
Prashanth B0e960282014-05-13 19:38:28 -070065_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070066_db = None
mbligh36768f02008-02-22 18:28:33 +000067_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070068
69# These 2 globals are replaced for testing
70_autoserv_directory = autoserv_utils.autoserv_directory
71_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000072_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000073_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070074_inline_host_acquisition = global_config.global_config.get_config_value(
75 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
76 default=True)
77
Dan Shiec1d47d2015-02-13 11:38:13 -080078_enable_ssp_container = global_config.global_config.get_config_value(
79 'AUTOSERV', 'enable_ssp_container', type=bool,
80 default=True)
mbligh36768f02008-02-22 18:28:33 +000081
mbligh83c1e9e2009-05-01 23:10:41 +000082def _site_init_monitor_db_dummy():
83 return {}
84
85
jamesren76fcf192010-04-21 20:39:50 +000086def _verify_default_drone_set_exists():
87 if (models.DroneSet.drone_sets_enabled() and
88 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070089 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080090 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000091
92
93def _sanity_check():
94 """Make sure the configs are consistent before starting the scheduler"""
95 _verify_default_drone_set_exists()
96
97
mbligh36768f02008-02-22 18:28:33 +000098def main():
showard27f33872009-04-07 18:20:53 +000099 try:
showard549afad2009-08-20 23:33:36 +0000100 try:
101 main_without_exception_handling()
102 except SystemExit:
103 raise
104 except:
105 logging.exception('Exception escaping in monitor_db')
106 raise
107 finally:
108 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000109
110
111def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700112 scheduler_lib.setup_logging(
113 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
114 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000115 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000116 parser = optparse.OptionParser(usage)
117 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
118 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000119 parser.add_option('--test', help='Indicate that scheduler is under ' +
120 'test and should use dummy autoserv and no parsing',
121 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700122 parser.add_option('--production',
123 help=('Indicate that scheduler is running in production '
124 'environment and it can use database that is not '
125 'hosted in localhost. If it is set to False, '
126 'scheduler will fail if database is not in '
127 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700128 action='store_true', default=False)
jadmanski0afbb632008-06-06 21:10:57 +0000129 (options, args) = parser.parse_args()
130 if len(args) != 1:
131 parser.print_usage()
132 return
mbligh36768f02008-02-22 18:28:33 +0000133
Dan Shif6c65bd2014-08-29 16:15:07 -0700134 scheduler_lib.check_production_settings(options)
135
showard5613c662009-06-08 23:30:33 +0000136 scheduler_enabled = global_config.global_config.get_config_value(
137 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
138
139 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800140 logging.error("Scheduler not enabled, set enable_scheduler to true in "
141 "the global_config's SCHEDULER section to enable it. "
142 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000143 sys.exit(1)
144
jadmanski0afbb632008-06-06 21:10:57 +0000145 global RESULTS_DIR
146 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000147
mbligh83c1e9e2009-05-01 23:10:41 +0000148 site_init = utils.import_site_function(__file__,
149 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
150 _site_init_monitor_db_dummy)
151 site_init()
152
showardcca334f2009-03-12 20:38:34 +0000153 # Change the cwd while running to avoid issues incase we were launched from
154 # somewhere odd (such as a random NFS home directory of the person running
155 # sudo to launch us as the appropriate user).
156 os.chdir(RESULTS_DIR)
157
jamesrenc7d387e2010-08-10 21:48:30 +0000158 # This is helpful for debugging why stuff a scheduler launches is
159 # misbehaving.
160 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000161
jadmanski0afbb632008-06-06 21:10:57 +0000162 if options.test:
163 global _autoserv_path
164 _autoserv_path = 'autoserv_dummy'
165 global _testing_mode
166 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000167
jamesrenc44ae992010-02-19 00:12:54 +0000168 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000169 server.start()
170
Dan Shicf2e8dd2015-05-07 17:18:48 -0700171 # Start the thread to report metadata.
172 metadata_reporter.start()
173
Richard Barnetteffed1722016-05-18 15:57:22 -0700174 ts_mon_config.SetupTsMonGlobalState('autotest_scheduler')
175
jadmanski0afbb632008-06-06 21:10:57 +0000176 try:
jamesrenc44ae992010-02-19 00:12:54 +0000177 initialize()
showardc5afc462009-01-13 00:09:39 +0000178 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000179 dispatcher.initialize(recover_hosts=options.recover_hosts)
Shuqian Zhaoce59fe52015-03-18 10:53:58 -0700180 minimum_tick_sec = global_config.global_config.get_config_value(
181 scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
showardc5afc462009-01-13 00:09:39 +0000182
Eric Lia82dc352011-02-23 13:15:52 -0800183 while not _shutdown and not server._shutdown_scheduler:
Shuqian Zhaoce59fe52015-03-18 10:53:58 -0700184 start = time.time()
jadmanski0afbb632008-06-06 21:10:57 +0000185 dispatcher.tick()
Shuqian Zhaoce59fe52015-03-18 10:53:58 -0700186 curr_tick_sec = time.time() - start
187 if (minimum_tick_sec > curr_tick_sec):
188 time.sleep(minimum_tick_sec - curr_tick_sec)
189 else:
190 time.sleep(0.0001)
Fang Denged0c4b52016-03-02 17:45:23 -0800191 except server_manager_utils.ServerActionError as e:
192 # This error is expected when the server is not in primary status
193 # for scheduler role. Thus do not send email for it.
194 logging.exception(e)
Prashanth B4ec98672014-05-15 10:44:54 -0700195 except Exception:
showard170873e2009-01-07 00:22:26 +0000196 email_manager.manager.log_stacktrace(
197 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000198
Dan Shicf2e8dd2015-05-07 17:18:48 -0700199 metadata_reporter.abort()
showard170873e2009-01-07 00:22:26 +0000200 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000201 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000202 _drone_manager.shutdown()
Prashanth B0e960282014-05-13 19:38:28 -0700203 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000204
205
Prashanth B4ec98672014-05-15 10:44:54 -0700206def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000207 global _shutdown
208 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000209 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000210
211
jamesrenc44ae992010-02-19 00:12:54 +0000212def initialize():
showardb18134f2009-03-20 20:52:18 +0000213 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
214 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000215
showard8de37132009-08-31 18:33:08 +0000216 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000217 logging.critical("monitor_db already running, aborting!")
218 sys.exit(1)
219 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000220
showardb1e51872008-10-07 11:08:18 +0000221 if _testing_mode:
222 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700223 scheduler_lib.DB_CONFIG_SECTION, 'database',
224 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000225
Dan Shib9144a42014-12-01 16:09:32 -0800226 # If server database is enabled, check if the server has role `scheduler`.
227 # If the server does not have scheduler role, exception will be raised and
228 # scheduler will not continue to run.
229 if server_manager_utils.use_server_db():
230 server_manager_utils.confirm_server_has_role(hostname='localhost',
231 role='scheduler')
232
jadmanski0afbb632008-06-06 21:10:57 +0000233 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700234 global _db_manager
235 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700236 global _db
237 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000238 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700239 signal.signal(signal.SIGINT, handle_signal)
240 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000241
jamesrenc44ae992010-02-19 00:12:54 +0000242 initialize_globals()
243 scheduler_models.initialize()
244
Dan Shi114e1722016-01-10 18:12:53 -0800245 drone_list = system_utils.get_drones()
showard170873e2009-01-07 00:22:26 +0000246 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000247 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000248 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
249
showardb18134f2009-03-20 20:52:18 +0000250 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000251
252
jamesrenc44ae992010-02-19 00:12:54 +0000253def initialize_globals():
254 global _drone_manager
255 _drone_manager = drone_manager.instance()
256
257
showarded2afea2009-07-07 20:54:07 +0000258def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
259 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000260 """
261 @returns The autoserv command line as a list of executable + parameters.
262
263 @param machines - string - A machine or comma separated list of machines
264 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000265 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700266 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
267 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000268 @param queue_entry - A HostQueueEntry object - If supplied and no Job
269 object was supplied, this will be used to lookup the Job object.
270 """
Simran Basi1bf60eb2015-12-01 16:39:29 -0800271 command = autoserv_utils.autoserv_run_job_command(_autoserv_directory,
Aviv Keshet308e7362013-05-21 14:43:16 -0700272 machines, results_directory=drone_manager.WORKING_DIRECTORY,
273 extra_args=extra_args, job=job, queue_entry=queue_entry,
Simran Basi1bf60eb2015-12-01 16:39:29 -0800274 verbose=verbose, in_lab=True)
275 return command
showard87ba02a2009-04-20 19:37:32 +0000276
277
Simran Basia858a232012-08-21 11:04:37 -0700278class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800279
280
jadmanski0afbb632008-06-06 21:10:57 +0000281 def __init__(self):
282 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000283 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700284 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000285 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700286 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700287 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Dan Shic458f662015-04-29 12:12:38 -0700288 _db, _drone_manager)
showard170873e2009-01-07 00:22:26 +0000289 self._host_agents = {}
290 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000291 self._tick_count = 0
292 self._last_garbage_stats_time = time.time()
293 self._seconds_between_garbage_stats = 60 * (
294 global_config.global_config.get_config_value(
295 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700296 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700297 self._tick_debug = global_config.global_config.get_config_value(
298 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
299 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700300 self._extra_debugging = global_config.global_config.get_config_value(
301 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
302 default=False)
mbligh36768f02008-02-22 18:28:33 +0000303
Prashanth Bf66d51b2014-05-06 12:42:25 -0700304 # If _inline_host_acquisition is set the scheduler will acquire and
305 # release hosts against jobs inline, with the tick. Otherwise the
306 # scheduler will only focus on jobs that already have hosts, and
307 # will not explicitly unlease a host when a job finishes using it.
308 self._job_query_manager = query_managers.AFEJobQueryManager()
309 self._host_scheduler = (host_scheduler.BaseHostScheduler()
310 if _inline_host_acquisition else
311 host_scheduler.DummyHostScheduler())
312
mbligh36768f02008-02-22 18:28:33 +0000313
showard915958d2009-04-22 21:00:58 +0000314 def initialize(self, recover_hosts=True):
315 self._periodic_cleanup.initialize()
316 self._24hr_upkeep.initialize()
Dan Shi55d58992015-05-05 09:10:02 -0700317 # Execute all actions queued in the cleanup tasks. Scheduler tick will
318 # run a refresh task first. If there is any action in the queue, refresh
319 # will raise an exception.
320 _drone_manager.execute_actions()
showard915958d2009-04-22 21:00:58 +0000321
jadmanski0afbb632008-06-06 21:10:57 +0000322 # always recover processes
323 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000324
jadmanski0afbb632008-06-06 21:10:57 +0000325 if recover_hosts:
326 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000327
328
Simran Basi0ec94dd2012-08-28 09:50:10 -0700329 def _log_tick_msg(self, msg):
330 if self._tick_debug:
331 logging.debug(msg)
332
333
Simran Basidef92872012-09-20 13:34:34 -0700334 def _log_extra_msg(self, msg):
335 if self._extra_debugging:
336 logging.debug(msg)
337
338
jadmanski0afbb632008-06-06 21:10:57 +0000339 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700340 """
341 This is an altered version of tick() where we keep track of when each
342 major step begins so we can try to figure out where we are using most
343 of the tick time.
344 """
Gabe Black1e1c41b2015-02-04 23:55:15 -0800345 timer = autotest_stats.Timer('scheduler.tick')
Dan Shi114e1722016-01-10 18:12:53 -0800346 system_utils.DroneCache.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700347 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000348 self._garbage_collection()
Prashanth B340fd1e2014-06-22 12:44:10 -0700349 self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
350 _drone_manager.trigger_refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700351 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000352 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700353 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000354 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700355 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000356 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700357 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000358 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700359 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000360 self._schedule_new_jobs()
Prashanth B340fd1e2014-06-22 12:44:10 -0700361 self._log_tick_msg('Calling _drone_manager.sync_refresh().')
362 _drone_manager.sync_refresh()
Dan Shi55d58992015-05-05 09:10:02 -0700363 # _run_cleanup must be called between drone_manager.sync_refresh, and
364 # drone_manager.execute_actions, as sync_refresh will clear the calls
365 # queued in drones. Therefore, any action that calls drone.queue_call
366 # to add calls to the drone._calls, should be after drone refresh is
367 # completed and before drone_manager.execute_actions at the end of the
368 # tick.
369 self._log_tick_msg('Calling _run_cleanup().')
370 self._run_cleanup()
Prashanth B67548092014-07-11 18:46:01 -0700371 self._log_tick_msg('Calling _find_aborting().')
372 self._find_aborting()
373 self._log_tick_msg('Calling _find_aborted_special_tasks().')
374 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700375 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000376 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700377 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000378 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700379 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000380 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700381 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700382 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700383 with timer.get_client('email_manager_send_queued_emails'):
384 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700385 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700386 with timer.get_client('django_db_reset_queries'):
387 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000388 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000389
showard97aed502008-11-04 02:01:24 +0000390
mblighf3294cc2009-04-08 21:17:38 +0000391 def _run_cleanup(self):
392 self._periodic_cleanup.run_cleanup_maybe()
393 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000394
mbligh36768f02008-02-22 18:28:33 +0000395
showardf13a9e22009-12-18 22:54:09 +0000396 def _garbage_collection(self):
397 threshold_time = time.time() - self._seconds_between_garbage_stats
398 if threshold_time < self._last_garbage_stats_time:
399 # Don't generate these reports very often.
400 return
401
402 self._last_garbage_stats_time = time.time()
403 # Force a full level 0 collection (because we can, it doesn't hurt
404 # at this interval).
405 gc.collect()
406 logging.info('Logging garbage collector stats on tick %d.',
407 self._tick_count)
408 gc_stats._log_garbage_collector_stats()
409
410
showard170873e2009-01-07 00:22:26 +0000411 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
412 for object_id in object_ids:
413 agent_dict.setdefault(object_id, set()).add(agent)
414
415
416 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
417 for object_id in object_ids:
418 assert object_id in agent_dict
419 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700420 # If an ID has no more active agent associated, there is no need to
421 # keep it in the dictionary. Otherwise, scheduler will keep an
422 # unnecessarily big dictionary until being restarted.
423 if not agent_dict[object_id]:
424 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000425
426
showardd1195652009-12-08 22:21:02 +0000427 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700428 """
429 Creates and adds an agent to the dispatchers list.
430
431 In creating the agent we also pass on all the queue_entry_ids and
432 host_ids from the special agent task. For every agent we create, we
433 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
434 against the host_ids given to it. So theoritically, a host can have any
435 number of agents associated with it, and each of them can have any
436 special agent task, though in practice we never see > 1 agent/task per
437 host at any time.
438
439 @param agent_task: A SpecialTask for the agent to manage.
440 """
showardd1195652009-12-08 22:21:02 +0000441 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000442 self._agents.append(agent)
443 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000444 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
445 self._register_agent_for_ids(self._queue_entry_agents,
446 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000447
showard170873e2009-01-07 00:22:26 +0000448
449 def get_agents_for_entry(self, queue_entry):
450 """
451 Find agents corresponding to the specified queue_entry.
452 """
showardd3dc1992009-04-22 21:01:40 +0000453 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000454
455
456 def host_has_agent(self, host):
457 """
458 Determine if there is currently an Agent present using this host.
459 """
460 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000461
462
jadmanski0afbb632008-06-06 21:10:57 +0000463 def remove_agent(self, agent):
464 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000465 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
466 agent)
467 self._unregister_agent_for_ids(self._queue_entry_agents,
468 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000469
470
showard8cc058f2009-09-08 16:26:33 +0000471 def _host_has_scheduled_special_task(self, host):
472 return bool(models.SpecialTask.objects.filter(host__id=host.id,
473 is_active=False,
474 is_complete=False))
475
476
jadmanski0afbb632008-06-06 21:10:57 +0000477 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000478 agent_tasks = self._create_recovery_agent_tasks()
479 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000480 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000481 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000482 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000483 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000484 self._reverify_remaining_hosts()
485 # reinitialize drones after killing orphaned processes, since they can
486 # leave around files when they die
487 _drone_manager.execute_actions()
488 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000489
showard170873e2009-01-07 00:22:26 +0000490
showardd1195652009-12-08 22:21:02 +0000491 def _create_recovery_agent_tasks(self):
492 return (self._get_queue_entry_agent_tasks()
493 + self._get_special_task_agent_tasks(is_active=True))
494
495
496 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700497 """
498 Get agent tasks for all hqe in the specified states.
499
500 Loosely this translates to taking a hqe in one of the specified states,
501 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
502 through _get_agent_task_for_queue_entry. Each queue entry can only have
503 one agent task at a time, but there might be multiple queue entries in
504 the group.
505
506 @return: A list of AgentTasks.
507 """
showardd1195652009-12-08 22:21:02 +0000508 # host queue entry statuses handled directly by AgentTasks (Verifying is
509 # handled through SpecialTasks, so is not listed here)
510 statuses = (models.HostQueueEntry.Status.STARTING,
511 models.HostQueueEntry.Status.RUNNING,
512 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000513 models.HostQueueEntry.Status.PARSING,
514 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000515 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000516 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000517 where='status IN (%s)' % status_list)
Gabe Black1e1c41b2015-02-04 23:55:15 -0800518 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Alex Miller47cd2472013-11-25 15:20:04 -0800519 'running', len(queue_entries))
showardd1195652009-12-08 22:21:02 +0000520
521 agent_tasks = []
522 used_queue_entries = set()
523 for entry in queue_entries:
524 if self.get_agents_for_entry(entry):
525 # already being handled
526 continue
527 if entry in used_queue_entries:
528 # already picked up by a synchronous job
529 continue
530 agent_task = self._get_agent_task_for_queue_entry(entry)
531 agent_tasks.append(agent_task)
532 used_queue_entries.update(agent_task.queue_entries)
533 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000534
535
showardd1195652009-12-08 22:21:02 +0000536 def _get_special_task_agent_tasks(self, is_active=False):
537 special_tasks = models.SpecialTask.objects.filter(
538 is_active=is_active, is_complete=False)
539 return [self._get_agent_task_for_special_task(task)
540 for task in special_tasks]
541
542
543 def _get_agent_task_for_queue_entry(self, queue_entry):
544 """
beeps8bb1f7d2013-08-05 01:30:09 -0700545 Construct an AgentTask instance for the given active HostQueueEntry.
546
showardd1195652009-12-08 22:21:02 +0000547 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700548 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000549 """
550 task_entries = queue_entry.job.get_group_entries(queue_entry)
551 self._check_for_duplicate_host_entries(task_entries)
552
553 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
554 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000555 if queue_entry.is_hostless():
556 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000557 return QueueTask(queue_entries=task_entries)
558 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700559 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000560 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700561 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000562 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700563 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000564
Prashanth B0e960282014-05-13 19:38:28 -0700565 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800566 '_get_agent_task_for_queue_entry got entry with '
567 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000568
569
570 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000571 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
572 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000573 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000574 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000575 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000576 if using_host:
showardd1195652009-12-08 22:21:02 +0000577 self._assert_host_has_no_agent(task_entry)
578
579
580 def _assert_host_has_no_agent(self, entry):
581 """
582 @param entry: a HostQueueEntry or a SpecialTask
583 """
584 if self.host_has_agent(entry.host):
585 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700586 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000587 'While scheduling %s, host %s already has a host agent %s'
588 % (entry, entry.host, agent.task))
589
590
591 def _get_agent_task_for_special_task(self, special_task):
592 """
593 Construct an AgentTask class to run the given SpecialTask and add it
594 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700595
MK Ryu35d661e2014-09-25 17:44:10 -0700596 A special task is created through schedule_special_tasks, but only if
beeps8bb1f7d2013-08-05 01:30:09 -0700597 the host doesn't already have an agent. This happens through
598 add_agent_task. All special agent tasks are given a host on creation,
599 and a Null hqe. To create a SpecialAgentTask object, you need a
600 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
601 object contains a hqe it's passed on to the special agent task, which
602 creates a HostQueueEntry and saves it as it's queue_entry.
603
showardd1195652009-12-08 22:21:02 +0000604 @param special_task: a models.SpecialTask instance
605 @returns an AgentTask to run this SpecialTask
606 """
607 self._assert_host_has_no_agent(special_task)
608
beeps5e2bb4a2013-10-28 11:26:45 -0700609 special_agent_task_classes = (prejob_task.CleanupTask,
610 prejob_task.VerifyTask,
611 prejob_task.RepairTask,
612 prejob_task.ResetTask,
613 prejob_task.ProvisionTask)
614
showardd1195652009-12-08 22:21:02 +0000615 for agent_task_class in special_agent_task_classes:
616 if agent_task_class.TASK_TYPE == special_task.task:
617 return agent_task_class(task=special_task)
618
Prashanth B0e960282014-05-13 19:38:28 -0700619 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800620 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000621
622
623 def _register_pidfiles(self, agent_tasks):
624 for agent_task in agent_tasks:
625 agent_task.register_necessary_pidfiles()
626
627
628 def _recover_tasks(self, agent_tasks):
629 orphans = _drone_manager.get_orphaned_autoserv_processes()
630
631 for agent_task in agent_tasks:
632 agent_task.recover()
633 if agent_task.monitor and agent_task.monitor.has_process():
634 orphans.discard(agent_task.monitor.get_process())
635 self.add_agent_task(agent_task)
636
637 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000638
639
showard8cc058f2009-09-08 16:26:33 +0000640 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000641 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
642 % status):
showard0db3d432009-10-12 20:29:15 +0000643 if entry.status == status and not self.get_agents_for_entry(entry):
644 # The status can change during iteration, e.g., if job.run()
645 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000646 yield entry
647
648
showard6878e8b2009-07-20 22:37:45 +0000649 def _check_for_remaining_orphan_processes(self, orphans):
650 if not orphans:
651 return
652 subject = 'Unrecovered orphan autoserv processes remain'
653 message = '\n'.join(str(process) for process in orphans)
654 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000655
656 die_on_orphans = global_config.global_config.get_config_value(
657 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
658
659 if die_on_orphans:
660 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000661
showard170873e2009-01-07 00:22:26 +0000662
showard8cc058f2009-09-08 16:26:33 +0000663 def _recover_pending_entries(self):
664 for entry in self._get_unassigned_entries(
665 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000666 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000667 entry.on_pending()
668
669
showardb8900452009-10-12 20:31:01 +0000670 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000671 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000672 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
673 unrecovered_hqes = []
674 for queue_entry in queue_entries:
675 special_tasks = models.SpecialTask.objects.filter(
676 task__in=(models.SpecialTask.Task.CLEANUP,
677 models.SpecialTask.Task.VERIFY),
678 queue_entry__id=queue_entry.id,
679 is_complete=False)
680 if special_tasks.count() == 0:
681 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000682
showardb8900452009-10-12 20:31:01 +0000683 if unrecovered_hqes:
684 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700685 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000686 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000687 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000688
689
showard65db3932009-10-28 19:54:35 +0000690 def _schedule_special_tasks(self):
691 """
692 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700693
694 Special tasks include PreJobTasks like verify, reset and cleanup.
695 They are created through _schedule_new_jobs and associated with a hqe
696 This method translates SpecialTasks to the appropriate AgentTask and
697 adds them to the dispatchers agents list, so _handle_agents can execute
698 them.
showard65db3932009-10-28 19:54:35 +0000699 """
Prashanth B4ec98672014-05-15 10:44:54 -0700700 # When the host scheduler is responsible for acquisition we only want
701 # to run tasks with leased hosts. All hqe tasks will already have
702 # leased hosts, and we don't want to run frontend tasks till the host
703 # scheduler has vetted the assignment. Note that this doesn't include
704 # frontend tasks with hosts leased by other active hqes.
705 for task in self._job_query_manager.get_prioritized_special_tasks(
706 only_tasks_with_leased_hosts=not _inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000707 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000708 continue
showardd1195652009-12-08 22:21:02 +0000709 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000710
711
showard170873e2009-01-07 00:22:26 +0000712 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000713 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000714 # should never happen
showarded2afea2009-07-07 20:54:07 +0000715 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000716 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000717 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700718 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000719 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000720
721
jadmanski0afbb632008-06-06 21:10:57 +0000722 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000723 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700724 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000725 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000726 if self.host_has_agent(host):
727 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000728 continue
showard8cc058f2009-09-08 16:26:33 +0000729 if self._host_has_scheduled_special_task(host):
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700730 # host will have a special task scheduled on the next tick
showard8cc058f2009-09-08 16:26:33 +0000731 continue
showard170873e2009-01-07 00:22:26 +0000732 if print_message:
showardb18134f2009-03-20 20:52:18 +0000733 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000734 models.SpecialTask.objects.create(
735 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000736 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000737
738
jadmanski0afbb632008-06-06 21:10:57 +0000739 def _recover_hosts(self):
740 # recover "Repair Failed" hosts
741 message = 'Reverifying dead host %s'
742 self._reverify_hosts_where("status = 'Repair Failed'",
743 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000744
745
showard89f84db2009-03-12 20:39:13 +0000746 def _refresh_pending_queue_entries(self):
747 """
748 Lookup the pending HostQueueEntries and call our HostScheduler
749 refresh() method given that list. Return the list.
750
751 @returns A list of pending HostQueueEntries sorted in priority order.
752 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700753 queue_entries = self._job_query_manager.get_pending_queue_entries(
754 only_hostless=not _inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000755 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000756 return []
showard89f84db2009-03-12 20:39:13 +0000757 return queue_entries
758
759
showarda9545c02009-12-18 22:44:26 +0000760 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800761 """Schedule a hostless (suite) job.
762
763 @param queue_entry: The queue_entry representing the hostless job.
764 """
showarda9545c02009-12-18 22:44:26 +0000765 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700766
767 # Need to set execution_subdir before setting the status:
768 # After a restart of the scheduler, agents will be restored for HQEs in
769 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
770 # execution_subdir is needed. Therefore it must be set before entering
771 # one of these states.
772 # Otherwise, if the scheduler was interrupted between setting the status
773 # and the execution_subdir, upon it's restart restoring agents would
774 # fail.
775 # Is there a way to get a status in one of these states without going
776 # through this code? Following cases are possible:
777 # - If it's aborted before being started:
778 # active bit will be 0, so there's nothing to parse, it will just be
779 # set to completed by _find_aborting. Critical statuses are skipped.
780 # - If it's aborted or it fails after being started:
781 # It was started, so this code was executed.
782 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000783 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000784
785
beepscc9fc702013-12-02 12:45:38 -0800786 def _schedule_host_job(self, host, queue_entry):
787 """Schedules a job on the given host.
788
789 1. Assign the host to the hqe, if it isn't already assigned.
790 2. Create a SpecialAgentTask for the hqe.
791 3. Activate the hqe.
792
793 @param queue_entry: The job to schedule.
794 @param host: The host to schedule the job on.
795 """
796 if self.host_has_agent(host):
797 host_agent_task = list(self._host_agents.get(host.id))[0].task
798 subject = 'Host with agents assigned to an HQE'
799 message = ('HQE: %s assigned host %s, but the host has '
800 'agent: %s for queue_entry %s. The HQE '
beepsf7d35162014-02-13 16:42:13 -0800801 'will have to try and acquire a host next tick ' %
beepscc9fc702013-12-02 12:45:38 -0800802 (queue_entry, host.hostname, host_agent_task,
803 host_agent_task.queue_entry))
804 email_manager.manager.enqueue_notify_email(subject, message)
beepscc9fc702013-12-02 12:45:38 -0800805 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700806 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800807
808
showard89f84db2009-03-12 20:39:13 +0000809 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700810 """
811 Find any new HQEs and call schedule_pre_job_tasks for it.
812
813 This involves setting the status of the HQE and creating a row in the
814 db corresponding the the special task, through
815 scheduler_models._queue_special_task. The new db row is then added as
816 an agent to the dispatcher through _schedule_special_tasks and
817 scheduled for execution on the drone through _handle_agents.
818 """
showard89f84db2009-03-12 20:39:13 +0000819 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000820
beepscc9fc702013-12-02 12:45:38 -0800821 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700822 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700823 new_jobs_with_hosts = 0
824 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800825 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700826 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000827
beepscc9fc702013-12-02 12:45:38 -0800828 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000829 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000830 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700831 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000832 else:
beepscc9fc702013-12-02 12:45:38 -0800833 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700834 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700835
Gabe Black1e1c41b2015-02-04 23:55:15 -0800836 autotest_stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800837 if not host_jobs:
838 return
Prashanth Bf66d51b2014-05-06 12:42:25 -0700839 if not _inline_host_acquisition:
840 message = ('Found %s jobs that need hosts though '
841 '_inline_host_acquisition=%s. Will acquire hosts.' %
842 ([str(job) for job in host_jobs],
843 _inline_host_acquisition))
844 email_manager.manager.enqueue_notify_email(
845 'Processing unexpected host acquisition requests', message)
846 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
847 for host_assignment in jobs_with_hosts:
848 self._schedule_host_job(host_assignment.host, host_assignment.job)
849 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800850
Gabe Black1e1c41b2015-02-04 23:55:15 -0800851 autotest_stats.Gauge(key).send('new_jobs_with_hosts',
852 new_jobs_with_hosts)
853 autotest_stats.Gauge(key).send('new_jobs_without_hosts',
854 new_jobs_need_hosts -
855 new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000856
857
showard8cc058f2009-09-08 16:26:33 +0000858 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700859 """
860 Adds agents to the dispatcher.
861
862 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
863 QueueTask for example, will have a job with a control file, and
864 the agent will have methods that poll, abort and check if the queue
865 task is finished. The dispatcher runs the agent_task, as well as
866 other agents in it's _agents member, through _handle_agents, by
867 calling the Agents tick().
868
869 This method creates an agent for each HQE in one of (starting, running,
870 gathering, parsing, archiving) states, and adds it to the dispatcher so
871 it is handled by _handle_agents.
872 """
showardd1195652009-12-08 22:21:02 +0000873 for agent_task in self._get_queue_entry_agent_tasks():
874 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000875
876
877 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000878 for entry in scheduler_models.HostQueueEntry.fetch(
879 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000880 task = entry.job.schedule_delayed_callback_task(entry)
881 if task:
showardd1195652009-12-08 22:21:02 +0000882 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000883
884
jadmanski0afbb632008-06-06 21:10:57 +0000885 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700886 """
887 Looks through the afe_host_queue_entries for an aborted entry.
888
889 The aborted bit is set on an HQE in many ways, the most common
890 being when a user requests an abort through the frontend, which
891 results in an rpc from the afe to abort_host_queue_entries.
892 """
jamesrene7c65cb2010-06-08 20:38:10 +0000893 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000894 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700895 where='aborted=1 and complete=0'):
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800896
897 # If the job is running on a shard, let the shard handle aborting
898 # it and sync back the right status.
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800899 if entry.job.shard_id is not None and not server_utils.is_shard():
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800900 logging.info('Waiting for shard %s to abort hqe %s',
901 entry.job.shard_id, entry)
902 continue
903
showardf4a2e502009-07-28 20:06:39 +0000904 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800905
906 # The task would have started off with both is_complete and
907 # is_active = False. Aborted tasks are neither active nor complete.
908 # For all currently active tasks this will happen through the agent,
909 # but we need to manually update the special tasks that haven't
910 # started yet, because they don't have agents.
911 models.SpecialTask.objects.filter(is_active=False,
912 queue_entry_id=entry.id).update(is_complete=True)
913
showardd3dc1992009-04-22 21:01:40 +0000914 for agent in self.get_agents_for_entry(entry):
915 agent.abort()
916 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000917 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700918 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000919 for job in jobs_to_stop:
920 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000921
922
beeps8bb1f7d2013-08-05 01:30:09 -0700923 def _find_aborted_special_tasks(self):
924 """
925 Find SpecialTasks that have been marked for abortion.
926
927 Poll the database looking for SpecialTasks that are active
928 and have been marked for abortion, then abort them.
929 """
930
931 # The completed and active bits are very important when it comes
932 # to scheduler correctness. The active bit is set through the prolog
933 # of a special task, and reset through the cleanup method of the
934 # SpecialAgentTask. The cleanup is called both through the abort and
935 # epilog. The complete bit is set in several places, and in general
936 # a hanging job will have is_active=1 is_complete=0, while a special
937 # task which completed will have is_active=0 is_complete=1. To check
938 # aborts we directly check active because the complete bit is set in
939 # several places, including the epilog of agent tasks.
940 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
941 is_aborted=True)
942 for task in aborted_tasks:
943 # There are 2 ways to get the agent associated with a task,
944 # through the host and through the hqe. A special task
945 # always needs a host, but doesn't always need a hqe.
946 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700947 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000948
beeps8bb1f7d2013-08-05 01:30:09 -0700949 # The epilog preforms critical actions such as
950 # queueing the next SpecialTask, requeuing the
951 # hqe etc, however it doesn't actually kill the
952 # monitor process and set the 'done' bit. Epilogs
953 # assume that the job failed, and that the monitor
954 # process has already written an exit code. The
955 # done bit is a necessary condition for
956 # _handle_agents to schedule any more special
957 # tasks against the host, and it must be set
958 # in addition to is_active, is_complete and success.
959 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000960 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700961
962
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700963 def _can_start_agent(self, agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000964 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000965 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000966 return True
967 # don't allow any nonzero-process agents to run after we've reached a
968 # limit (this avoids starvation of many-process agents)
969 if have_reached_limit:
970 return False
971 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000972 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000973 agent.task.owner_username,
974 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000975 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000976 return False
showard4c5374f2008-09-04 17:02:56 +0000977 return True
978
979
jadmanski0afbb632008-06-06 21:10:57 +0000980 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700981 """
982 Handles agents of the dispatcher.
983
984 Appropriate Agents are added to the dispatcher through
985 _schedule_running_host_queue_entries. These agents each
986 have a task. This method runs the agents task through
987 agent.tick() leading to:
988 agent.start
989 prolog -> AgentTasks prolog
990 For each queue entry:
991 sets host status/status to Running
992 set started_on in afe_host_queue_entries
993 run -> AgentTasks run
994 Creates PidfileRunMonitor
995 Queues the autoserv command line for this AgentTask
996 via the drone manager. These commands are executed
997 through the drone managers execute actions.
998 poll -> AgentTasks/BaseAgentTask poll
999 checks the monitors exit_code.
1000 Executes epilog if task is finished.
1001 Executes AgentTasks _finish_task
1002 finish_task is usually responsible for setting the status
1003 of the HQE/host, and updating it's active and complete fileds.
1004
1005 agent.is_done
1006 Removed the agent from the dispatchers _agents queue.
1007 Is_done checks the finished bit on the agent, that is
1008 set based on the Agents task. During the agents poll
1009 we check to see if the monitor process has exited in
1010 it's finish method, and set the success member of the
1011 task based on this exit code.
1012 """
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001013 num_started_this_tick = 0
1014 num_finished_this_tick = 0
showard4c5374f2008-09-04 17:02:56 +00001015 have_reached_limit = False
1016 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -07001017 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +00001018 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001019 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1020 'queue_entry ids:%s' % (agent.host_ids,
1021 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001022 if not agent.started:
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001023 if not self._can_start_agent(agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001024 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001025 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001026 continue
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001027 num_started_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001028 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001029 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001030 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001031 if agent.is_done():
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001032 num_finished_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001033 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001034 self.remove_agent(agent)
Gabe Black1e1c41b2015-02-04 23:55:15 -08001035 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001036 'agents_started', num_started_this_tick)
Gabe Black1e1c41b2015-02-04 23:55:15 -08001037 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001038 'agents_finished', num_finished_this_tick)
1039 logging.info('%d running processes. %d added this tick.',
Simran Basi3f6717d2012-09-13 15:21:22 -07001040 _drone_manager.total_running_processes(),
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001041 num_started_this_tick)
mbligh36768f02008-02-22 18:28:33 +00001042
1043
showard29f7cd22009-04-29 21:16:24 +00001044 def _process_recurring_runs(self):
1045 recurring_runs = models.RecurringRun.objects.filter(
1046 start_date__lte=datetime.datetime.now())
1047 for rrun in recurring_runs:
1048 # Create job from template
1049 job = rrun.job
1050 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001051 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001052
1053 host_objects = info['hosts']
1054 one_time_hosts = info['one_time_hosts']
1055 metahost_objects = info['meta_hosts']
1056 dependencies = info['dependencies']
1057 atomic_group = info['atomic_group']
1058
1059 for host in one_time_hosts or []:
1060 this_host = models.Host.create_one_time_host(host.hostname)
1061 host_objects.append(this_host)
1062
1063 try:
1064 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001065 options=options,
showard29f7cd22009-04-29 21:16:24 +00001066 host_objects=host_objects,
1067 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001068 atomic_group=atomic_group)
1069
1070 except Exception, ex:
1071 logging.exception(ex)
1072 #TODO send email
1073
1074 if rrun.loop_count == 1:
1075 rrun.delete()
1076 else:
1077 if rrun.loop_count != 0: # if not infinite loop
1078 # calculate new start_date
1079 difference = datetime.timedelta(seconds=rrun.loop_period)
1080 rrun.start_date = rrun.start_date + difference
1081 rrun.loop_count -= 1
1082 rrun.save()
1083
1084
Simran Basia858a232012-08-21 11:04:37 -07001085SiteDispatcher = utils.import_site_class(
1086 __file__, 'autotest_lib.scheduler.site_monitor_db',
1087 'SiteDispatcher', BaseDispatcher)
1088
1089class Dispatcher(SiteDispatcher):
1090 pass
1091
1092
mbligh36768f02008-02-22 18:28:33 +00001093class Agent(object):
showard77182562009-06-10 00:16:05 +00001094 """
Alex Miller47715eb2013-07-24 03:34:01 -07001095 An agent for use by the Dispatcher class to perform a task. An agent wraps
1096 around an AgentTask mainly to associate the AgentTask with the queue_entry
1097 and host ids.
showard77182562009-06-10 00:16:05 +00001098
1099 The following methods are required on all task objects:
1100 poll() - Called periodically to let the task check its status and
1101 update its internal state. If the task succeeded.
1102 is_done() - Returns True if the task is finished.
1103 abort() - Called when an abort has been requested. The task must
1104 set its aborted attribute to True if it actually aborted.
1105
1106 The following attributes are required on all task objects:
1107 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001108 success - bool, True if this task succeeded.
1109 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1110 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001111 """
1112
1113
showard418785b2009-11-23 20:19:59 +00001114 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001115 """
Alex Miller47715eb2013-07-24 03:34:01 -07001116 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001117 """
showard8cc058f2009-09-08 16:26:33 +00001118 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001119
showard77182562009-06-10 00:16:05 +00001120 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001121 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001122
showard8cc058f2009-09-08 16:26:33 +00001123 self.queue_entry_ids = task.queue_entry_ids
1124 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001125
showard8cc058f2009-09-08 16:26:33 +00001126 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001127 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001128
1129
jadmanski0afbb632008-06-06 21:10:57 +00001130 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001131 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001132 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001133 self.task.poll()
1134 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001135 self.finished = True
showardec113162008-05-08 00:52:49 +00001136
1137
jadmanski0afbb632008-06-06 21:10:57 +00001138 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001139 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001140
1141
showardd3dc1992009-04-22 21:01:40 +00001142 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001143 if self.task:
1144 self.task.abort()
1145 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001146 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001147 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001148
showardd3dc1992009-04-22 21:01:40 +00001149
beeps5e2bb4a2013-10-28 11:26:45 -07001150class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001151 """
1152 Common functionality for QueueTask and HostlessQueueTask
1153 """
1154 def __init__(self, queue_entries):
1155 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001156 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001157 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001158
1159
showard73ec0442009-02-07 02:05:20 +00001160 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001161 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001162
1163
jamesrenc44ae992010-02-19 00:12:54 +00001164 def _write_control_file(self, execution_path):
1165 control_path = _drone_manager.attach_file_to_execution(
1166 execution_path, self.job.control_file)
1167 return control_path
1168
1169
Aviv Keshet308e7362013-05-21 14:43:16 -07001170 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001171 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001172 execution_path = self.queue_entries[0].execution_path()
1173 control_path = self._write_control_file(execution_path)
1174 hostnames = ','.join(entry.host.hostname
1175 for entry in self.queue_entries
1176 if not entry.is_hostless())
1177
1178 execution_tag = self.queue_entries[0].execution_tag()
1179 params = _autoserv_command_line(
1180 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001181 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001182 _drone_manager.absolute_path(control_path)],
1183 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001184 if self.job.is_image_update_job():
1185 params += ['--image', self.job.update_image_path]
1186
jamesrenc44ae992010-02-19 00:12:54 +00001187 return params
showardd1195652009-12-08 22:21:02 +00001188
1189
1190 @property
1191 def num_processes(self):
1192 return len(self.queue_entries)
1193
1194
1195 @property
1196 def owner_username(self):
1197 return self.job.owner
1198
1199
1200 def _working_directory(self):
1201 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001202
1203
jadmanski0afbb632008-06-06 21:10:57 +00001204 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001205 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001206 keyval_dict = self.job.keyval_dict()
1207 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001208 group_name = self.queue_entries[0].get_group_name()
1209 if group_name:
1210 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001211 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001212 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001213 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001214 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001215
1216
showard35162b02009-03-03 02:17:30 +00001217 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001218 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001219 _drone_manager.write_lines_to_file(error_file_path,
1220 [_LOST_PROCESS_ERROR])
1221
1222
showardd3dc1992009-04-22 21:01:40 +00001223 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001224 if not self.monitor:
1225 return
1226
showardd9205182009-04-27 20:09:55 +00001227 self._write_job_finished()
1228
showard35162b02009-03-03 02:17:30 +00001229 if self.monitor.lost_process:
1230 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001231
jadmanskif7fa2cc2008-10-01 14:13:23 +00001232
showardcbd74612008-11-19 21:42:02 +00001233 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001234 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001235 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001236 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001237 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001238
1239
jadmanskif7fa2cc2008-10-01 14:13:23 +00001240 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001241 if not self.monitor or not self.monitor.has_process():
1242 return
1243
jadmanskif7fa2cc2008-10-01 14:13:23 +00001244 # build up sets of all the aborted_by and aborted_on values
1245 aborted_by, aborted_on = set(), set()
1246 for queue_entry in self.queue_entries:
1247 if queue_entry.aborted_by:
1248 aborted_by.add(queue_entry.aborted_by)
1249 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1250 aborted_on.add(t)
1251
1252 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001253 # TODO(showard): this conditional is now obsolete, we just need to leave
1254 # it in temporarily for backwards compatibility over upgrades. delete
1255 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001256 assert len(aborted_by) <= 1
1257 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001258 aborted_by_value = aborted_by.pop()
1259 aborted_on_value = max(aborted_on)
1260 else:
1261 aborted_by_value = 'autotest_system'
1262 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001263
showarda0382352009-02-11 23:36:43 +00001264 self._write_keyval_after_job("aborted_by", aborted_by_value)
1265 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001266
showardcbd74612008-11-19 21:42:02 +00001267 aborted_on_string = str(datetime.datetime.fromtimestamp(
1268 aborted_on_value))
1269 self._write_status_comment('Job aborted by %s on %s' %
1270 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001271
1272
jadmanski0afbb632008-06-06 21:10:57 +00001273 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001274 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001275 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001276 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001277
1278
jadmanski0afbb632008-06-06 21:10:57 +00001279 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001280 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001281 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001282
1283
1284class QueueTask(AbstractQueueTask):
1285 def __init__(self, queue_entries):
1286 super(QueueTask, self).__init__(queue_entries)
1287 self._set_ids(queue_entries=queue_entries)
1288
1289
1290 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001291 self._check_queue_entry_statuses(
1292 self.queue_entries,
1293 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1294 models.HostQueueEntry.Status.RUNNING),
1295 allowed_host_statuses=(models.Host.Status.PENDING,
1296 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001297
1298 super(QueueTask, self).prolog()
1299
1300 for queue_entry in self.queue_entries:
1301 self._write_host_keyvals(queue_entry.host)
1302 queue_entry.host.set_status(models.Host.Status.RUNNING)
1303 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001304
1305
1306 def _finish_task(self):
1307 super(QueueTask, self)._finish_task()
1308
1309 for queue_entry in self.queue_entries:
1310 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001311 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001312
1313
Alex Miller9f01d5d2013-08-08 02:26:01 -07001314 def _command_line(self):
Dan Shi36cfd832014-10-10 13:38:51 -07001315 invocation = super(QueueTask, self)._command_line()
1316 # Check if server-side packaging is needed.
1317 if (_enable_ssp_container and
1318 self.job.control_type == control_data.CONTROL_TYPE.SERVER and
1319 self.job.require_ssp != False):
Dan Shiec1d47d2015-02-13 11:38:13 -08001320 invocation += ['--require-ssp']
Dan Shi36cfd832014-10-10 13:38:51 -07001321 keyval_dict = self.job.keyval_dict()
1322 test_source_build = keyval_dict.get('test_source_build', None)
1323 if test_source_build:
1324 invocation += ['--test_source_build', test_source_build]
Dan Shi70647ca2015-07-16 22:52:35 -07001325 if self.job.parent_job_id:
1326 invocation += ['--parent_job_id', str(self.job.parent_job_id)]
Dan Shi36cfd832014-10-10 13:38:51 -07001327 return invocation + ['--verify_job_repo_url']
Alex Miller9f01d5d2013-08-08 02:26:01 -07001328
1329
Dan Shi1a189052013-10-28 14:41:35 -07001330class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001331 def __init__(self, queue_entry):
1332 super(HostlessQueueTask, self).__init__([queue_entry])
1333 self.queue_entry_ids = [queue_entry.id]
1334
1335
1336 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001337 super(HostlessQueueTask, self).prolog()
1338
1339
mbligh4608b002010-01-05 18:22:35 +00001340 def _finish_task(self):
1341 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001342
1343 # When a job is added to database, its initial status is always
1344 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1345 # status, check if any of them can be started. If scheduler hits some
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001346 # limit, e.g., max_hostless_jobs_per_drone, scheduler will
1347 # leave these jobs in Starting status. Otherwise, the jobs'
1348 # status will be changed to Running, and an autoserv process
1349 # will be started in drone for each of these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001350 # If the entry is still in status Starting, the process has not started
1351 # yet. Therefore, there is no need to parse and collect log. Without
1352 # this check, exception will be raised by scheduler as execution_subdir
1353 # for this queue entry does not have a value yet.
1354 hqe = self.queue_entries[0]
1355 if hqe.status != models.HostQueueEntry.Status.STARTING:
1356 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001357
1358
mbligh36768f02008-02-22 18:28:33 +00001359if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001360 main()