blob: d420458bb84912a84625809da55ecc56b9667a46 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
Richard Barnetteffed1722016-05-18 15:57:22 -07002
3#pylint: disable=C0111
4
mbligh36768f02008-02-22 18:28:33 +00005"""
6Autotest scheduler
7"""
showard909c7a62008-07-15 21:52:38 +00008
Dan Shif6c65bd2014-08-29 16:15:07 -07009import datetime
10import gc
11import logging
12import optparse
13import os
14import signal
15import sys
16import time
showard402934a2009-12-21 22:20:47 +000017
Alex Miller05d7b4c2013-03-04 07:49:38 -080018import common
showard21baa452008-10-21 00:08:39 +000019from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000020
21import django.db
Aviv Keshet65fed072016-06-29 10:20:55 -070022from chromite.lib import metrics
Richard Barnetteffed1722016-05-18 15:57:22 -070023from chromite.lib import ts_mon_config
showard402934a2009-12-21 22:20:47 +000024
Dan Shiec1d47d2015-02-13 11:38:13 -080025from autotest_lib.client.common_lib import control_data
Prashanth B0e960282014-05-13 19:38:28 -070026from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070027from autotest_lib.client.common_lib import utils
Prashanth B0e960282014-05-13 19:38:28 -070028from autotest_lib.frontend.afe import models, rpc_utils
Fang Dengc330bee2014-10-21 18:10:55 -070029from autotest_lib.scheduler import agent_task, drone_manager
beeps5e2bb4a2013-10-28 11:26:45 -070030from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
31from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070032from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070033from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070034from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000035from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080036from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070037from autotest_lib.server import autoserv_utils
Dan Shi114e1722016-01-10 18:12:53 -080038from autotest_lib.server import system_utils
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080039from autotest_lib.server import utils as server_utils
Dan Shicf2e8dd2015-05-07 17:18:48 -070040from autotest_lib.site_utils import metadata_reporter
Dan Shib9144a42014-12-01 16:09:32 -080041from autotest_lib.site_utils import server_manager_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080042
Dan Shicf2e8dd2015-05-07 17:18:48 -070043
showard549afad2009-08-20 23:33:36 +000044BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
45PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000046
mbligh36768f02008-02-22 18:28:33 +000047RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000048AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
49
50if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000051 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000052AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
53AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
54
55if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000056 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000057
showard35162b02009-03-03 02:17:30 +000058# error message to leave in results dir when an autoserv process disappears
59# mysteriously
60_LOST_PROCESS_ERROR = """\
61Autoserv failed abnormally during execution for this job, probably due to a
62system error on the Autotest server. Full results may not be available. Sorry.
63"""
64
Prashanth B0e960282014-05-13 19:38:28 -070065_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070066_db = None
mbligh36768f02008-02-22 18:28:33 +000067_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070068
69# These 2 globals are replaced for testing
70_autoserv_directory = autoserv_utils.autoserv_directory
71_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000072_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000073_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070074_inline_host_acquisition = global_config.global_config.get_config_value(
75 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
76 default=True)
77
Dan Shiec1d47d2015-02-13 11:38:13 -080078_enable_ssp_container = global_config.global_config.get_config_value(
79 'AUTOSERV', 'enable_ssp_container', type=bool,
80 default=True)
mbligh36768f02008-02-22 18:28:33 +000081
mbligh83c1e9e2009-05-01 23:10:41 +000082def _site_init_monitor_db_dummy():
83 return {}
84
85
jamesren76fcf192010-04-21 20:39:50 +000086def _verify_default_drone_set_exists():
87 if (models.DroneSet.drone_sets_enabled() and
88 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070089 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080090 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000091
92
93def _sanity_check():
94 """Make sure the configs are consistent before starting the scheduler"""
95 _verify_default_drone_set_exists()
96
97
mbligh36768f02008-02-22 18:28:33 +000098def main():
showard27f33872009-04-07 18:20:53 +000099 try:
showard549afad2009-08-20 23:33:36 +0000100 try:
101 main_without_exception_handling()
102 except SystemExit:
103 raise
104 except:
105 logging.exception('Exception escaping in monitor_db')
106 raise
107 finally:
108 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000109
110
111def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700112 scheduler_lib.setup_logging(
113 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
114 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000115 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000116 parser = optparse.OptionParser(usage)
117 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
118 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000119 parser.add_option('--test', help='Indicate that scheduler is under ' +
120 'test and should use dummy autoserv and no parsing',
121 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700122 parser.add_option('--production',
123 help=('Indicate that scheduler is running in production '
124 'environment and it can use database that is not '
125 'hosted in localhost. If it is set to False, '
126 'scheduler will fail if database is not in '
127 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700128 action='store_true', default=False)
jadmanski0afbb632008-06-06 21:10:57 +0000129 (options, args) = parser.parse_args()
130 if len(args) != 1:
131 parser.print_usage()
132 return
mbligh36768f02008-02-22 18:28:33 +0000133
Dan Shif6c65bd2014-08-29 16:15:07 -0700134 scheduler_lib.check_production_settings(options)
135
showard5613c662009-06-08 23:30:33 +0000136 scheduler_enabled = global_config.global_config.get_config_value(
137 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
138
139 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800140 logging.error("Scheduler not enabled, set enable_scheduler to true in "
141 "the global_config's SCHEDULER section to enable it. "
142 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000143 sys.exit(1)
144
jadmanski0afbb632008-06-06 21:10:57 +0000145 global RESULTS_DIR
146 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000147
mbligh83c1e9e2009-05-01 23:10:41 +0000148 site_init = utils.import_site_function(__file__,
149 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
150 _site_init_monitor_db_dummy)
151 site_init()
152
showardcca334f2009-03-12 20:38:34 +0000153 # Change the cwd while running to avoid issues incase we were launched from
154 # somewhere odd (such as a random NFS home directory of the person running
155 # sudo to launch us as the appropriate user).
156 os.chdir(RESULTS_DIR)
157
jamesrenc7d387e2010-08-10 21:48:30 +0000158 # This is helpful for debugging why stuff a scheduler launches is
159 # misbehaving.
160 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000161
jadmanski0afbb632008-06-06 21:10:57 +0000162 if options.test:
163 global _autoserv_path
164 _autoserv_path = 'autoserv_dummy'
165 global _testing_mode
166 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000167
jamesrenc44ae992010-02-19 00:12:54 +0000168 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000169 server.start()
170
Dan Shicf2e8dd2015-05-07 17:18:48 -0700171 # Start the thread to report metadata.
172 metadata_reporter.start()
173
Paul Hobbseedcb8b2016-10-05 16:44:27 -0700174 with ts_mon_config.SetupTsMonGlobalState('autotest_scheduler',
175 indirect=True):
176 try:
177 initialize()
178 dispatcher = Dispatcher()
179 dispatcher.initialize(recover_hosts=options.recover_hosts)
180 minimum_tick_sec = global_config.global_config.get_config_value(
181 scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
Richard Barnetteffed1722016-05-18 15:57:22 -0700182
Paul Hobbseedcb8b2016-10-05 16:44:27 -0700183 while not _shutdown and not server._shutdown_scheduler:
184 start = time.time()
185 dispatcher.tick()
186 curr_tick_sec = time.time() - start
187 if minimum_tick_sec > curr_tick_sec:
188 time.sleep(minimum_tick_sec - curr_tick_sec)
189 else:
190 time.sleep(0.0001)
191 except server_manager_utils.ServerActionError as e:
192 # This error is expected when the server is not in primary status
193 # for scheduler role. Thus do not send email for it.
194 logging.exception(e)
195 except Exception:
196 email_manager.manager.log_stacktrace(
197 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000198
Paul Hobbsabd3b052016-10-03 18:25:23 +0000199 metadata_reporter.abort()
200 email_manager.manager.send_queued_emails()
201 server.shutdown()
202 _drone_manager.shutdown()
203 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000204
205
Prashanth B4ec98672014-05-15 10:44:54 -0700206def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000207 global _shutdown
208 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000209 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000210
211
jamesrenc44ae992010-02-19 00:12:54 +0000212def initialize():
showardb18134f2009-03-20 20:52:18 +0000213 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
214 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000215
showard8de37132009-08-31 18:33:08 +0000216 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000217 logging.critical("monitor_db already running, aborting!")
218 sys.exit(1)
219 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000220
showardb1e51872008-10-07 11:08:18 +0000221 if _testing_mode:
222 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700223 scheduler_lib.DB_CONFIG_SECTION, 'database',
224 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000225
Dan Shib9144a42014-12-01 16:09:32 -0800226 # If server database is enabled, check if the server has role `scheduler`.
227 # If the server does not have scheduler role, exception will be raised and
228 # scheduler will not continue to run.
229 if server_manager_utils.use_server_db():
230 server_manager_utils.confirm_server_has_role(hostname='localhost',
231 role='scheduler')
232
jadmanski0afbb632008-06-06 21:10:57 +0000233 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700234 global _db_manager
235 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700236 global _db
237 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000238 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700239 signal.signal(signal.SIGINT, handle_signal)
240 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000241
jamesrenc44ae992010-02-19 00:12:54 +0000242 initialize_globals()
243 scheduler_models.initialize()
244
Dan Shi114e1722016-01-10 18:12:53 -0800245 drone_list = system_utils.get_drones()
showard170873e2009-01-07 00:22:26 +0000246 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000247 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000248 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
249
showardb18134f2009-03-20 20:52:18 +0000250 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000251
252
jamesrenc44ae992010-02-19 00:12:54 +0000253def initialize_globals():
254 global _drone_manager
255 _drone_manager = drone_manager.instance()
256
257
showarded2afea2009-07-07 20:54:07 +0000258def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
259 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000260 """
261 @returns The autoserv command line as a list of executable + parameters.
262
263 @param machines - string - A machine or comma separated list of machines
264 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000265 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700266 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
267 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000268 @param queue_entry - A HostQueueEntry object - If supplied and no Job
269 object was supplied, this will be used to lookup the Job object.
270 """
Simran Basi1bf60eb2015-12-01 16:39:29 -0800271 command = autoserv_utils.autoserv_run_job_command(_autoserv_directory,
Aviv Keshet308e7362013-05-21 14:43:16 -0700272 machines, results_directory=drone_manager.WORKING_DIRECTORY,
273 extra_args=extra_args, job=job, queue_entry=queue_entry,
Simran Basi1bf60eb2015-12-01 16:39:29 -0800274 verbose=verbose, in_lab=True)
275 return command
showard87ba02a2009-04-20 19:37:32 +0000276
277
Simran Basia858a232012-08-21 11:04:37 -0700278class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800279
280
jadmanski0afbb632008-06-06 21:10:57 +0000281 def __init__(self):
282 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000283 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700284 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000285 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700286 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700287 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Dan Shic458f662015-04-29 12:12:38 -0700288 _db, _drone_manager)
showard170873e2009-01-07 00:22:26 +0000289 self._host_agents = {}
290 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000291 self._tick_count = 0
292 self._last_garbage_stats_time = time.time()
293 self._seconds_between_garbage_stats = 60 * (
294 global_config.global_config.get_config_value(
295 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700296 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700297 self._tick_debug = global_config.global_config.get_config_value(
298 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
299 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700300 self._extra_debugging = global_config.global_config.get_config_value(
301 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
302 default=False)
mbligh36768f02008-02-22 18:28:33 +0000303
Prashanth Bf66d51b2014-05-06 12:42:25 -0700304 # If _inline_host_acquisition is set the scheduler will acquire and
305 # release hosts against jobs inline, with the tick. Otherwise the
306 # scheduler will only focus on jobs that already have hosts, and
307 # will not explicitly unlease a host when a job finishes using it.
308 self._job_query_manager = query_managers.AFEJobQueryManager()
309 self._host_scheduler = (host_scheduler.BaseHostScheduler()
310 if _inline_host_acquisition else
311 host_scheduler.DummyHostScheduler())
312
mbligh36768f02008-02-22 18:28:33 +0000313
showard915958d2009-04-22 21:00:58 +0000314 def initialize(self, recover_hosts=True):
315 self._periodic_cleanup.initialize()
316 self._24hr_upkeep.initialize()
Dan Shi55d58992015-05-05 09:10:02 -0700317 # Execute all actions queued in the cleanup tasks. Scheduler tick will
318 # run a refresh task first. If there is any action in the queue, refresh
319 # will raise an exception.
320 _drone_manager.execute_actions()
showard915958d2009-04-22 21:00:58 +0000321
jadmanski0afbb632008-06-06 21:10:57 +0000322 # always recover processes
323 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000324
jadmanski0afbb632008-06-06 21:10:57 +0000325 if recover_hosts:
326 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000327
328
Simran Basi0ec94dd2012-08-28 09:50:10 -0700329 def _log_tick_msg(self, msg):
330 if self._tick_debug:
331 logging.debug(msg)
332
333
Simran Basidef92872012-09-20 13:34:34 -0700334 def _log_extra_msg(self, msg):
335 if self._extra_debugging:
336 logging.debug(msg)
337
338
jadmanski0afbb632008-06-06 21:10:57 +0000339 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700340 """
341 This is an altered version of tick() where we keep track of when each
342 major step begins so we can try to figure out where we are using most
343 of the tick time.
344 """
Dan Shi114e1722016-01-10 18:12:53 -0800345 system_utils.DroneCache.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700346 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000347 self._garbage_collection()
Prashanth B340fd1e2014-06-22 12:44:10 -0700348 self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
349 _drone_manager.trigger_refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700350 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000351 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700352 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000353 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700354 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000355 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700356 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000357 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700358 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000359 self._schedule_new_jobs()
Prashanth B340fd1e2014-06-22 12:44:10 -0700360 self._log_tick_msg('Calling _drone_manager.sync_refresh().')
361 _drone_manager.sync_refresh()
Dan Shi55d58992015-05-05 09:10:02 -0700362 # _run_cleanup must be called between drone_manager.sync_refresh, and
363 # drone_manager.execute_actions, as sync_refresh will clear the calls
364 # queued in drones. Therefore, any action that calls drone.queue_call
365 # to add calls to the drone._calls, should be after drone refresh is
366 # completed and before drone_manager.execute_actions at the end of the
367 # tick.
368 self._log_tick_msg('Calling _run_cleanup().')
369 self._run_cleanup()
Prashanth B67548092014-07-11 18:46:01 -0700370 self._log_tick_msg('Calling _find_aborting().')
371 self._find_aborting()
372 self._log_tick_msg('Calling _find_aborted_special_tasks().')
373 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700374 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000375 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700376 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000377 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700378 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000379 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700380 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700381 'email_manager.manager.send_queued_emails().')
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800382 # TODO(pprabhu) crbug.com/667171: Add back metric for % time spent in
383 # this sub-step.
384 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700385 self._log_tick_msg('Calling django.db.reset_queries().')
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800386 # TODO(pprabhu) crbug.com/667171: Add back metric for % time spent in
387 # this sub-step.
388 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000389 self._tick_count += 1
Aviv Keshet65fed072016-06-29 10:20:55 -0700390 metrics.Counter('chromeos/autotest/scheduler/tick').increment()
mbligh36768f02008-02-22 18:28:33 +0000391
showard97aed502008-11-04 02:01:24 +0000392
mblighf3294cc2009-04-08 21:17:38 +0000393 def _run_cleanup(self):
394 self._periodic_cleanup.run_cleanup_maybe()
395 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000396
mbligh36768f02008-02-22 18:28:33 +0000397
showardf13a9e22009-12-18 22:54:09 +0000398 def _garbage_collection(self):
399 threshold_time = time.time() - self._seconds_between_garbage_stats
400 if threshold_time < self._last_garbage_stats_time:
401 # Don't generate these reports very often.
402 return
403
404 self._last_garbage_stats_time = time.time()
405 # Force a full level 0 collection (because we can, it doesn't hurt
406 # at this interval).
407 gc.collect()
408 logging.info('Logging garbage collector stats on tick %d.',
409 self._tick_count)
410 gc_stats._log_garbage_collector_stats()
411
412
showard170873e2009-01-07 00:22:26 +0000413 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
414 for object_id in object_ids:
415 agent_dict.setdefault(object_id, set()).add(agent)
416
417
418 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
419 for object_id in object_ids:
420 assert object_id in agent_dict
421 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700422 # If an ID has no more active agent associated, there is no need to
423 # keep it in the dictionary. Otherwise, scheduler will keep an
424 # unnecessarily big dictionary until being restarted.
425 if not agent_dict[object_id]:
426 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000427
428
showardd1195652009-12-08 22:21:02 +0000429 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700430 """
431 Creates and adds an agent to the dispatchers list.
432
433 In creating the agent we also pass on all the queue_entry_ids and
434 host_ids from the special agent task. For every agent we create, we
435 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
436 against the host_ids given to it. So theoritically, a host can have any
437 number of agents associated with it, and each of them can have any
438 special agent task, though in practice we never see > 1 agent/task per
439 host at any time.
440
441 @param agent_task: A SpecialTask for the agent to manage.
442 """
showardd1195652009-12-08 22:21:02 +0000443 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000444 self._agents.append(agent)
445 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000446 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
447 self._register_agent_for_ids(self._queue_entry_agents,
448 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000449
showard170873e2009-01-07 00:22:26 +0000450
451 def get_agents_for_entry(self, queue_entry):
452 """
453 Find agents corresponding to the specified queue_entry.
454 """
showardd3dc1992009-04-22 21:01:40 +0000455 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000456
457
458 def host_has_agent(self, host):
459 """
460 Determine if there is currently an Agent present using this host.
461 """
462 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000463
464
jadmanski0afbb632008-06-06 21:10:57 +0000465 def remove_agent(self, agent):
466 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000467 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
468 agent)
469 self._unregister_agent_for_ids(self._queue_entry_agents,
470 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000471
472
showard8cc058f2009-09-08 16:26:33 +0000473 def _host_has_scheduled_special_task(self, host):
474 return bool(models.SpecialTask.objects.filter(host__id=host.id,
475 is_active=False,
476 is_complete=False))
477
478
jadmanski0afbb632008-06-06 21:10:57 +0000479 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000480 agent_tasks = self._create_recovery_agent_tasks()
481 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000482 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000483 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000484 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000485 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000486 self._reverify_remaining_hosts()
487 # reinitialize drones after killing orphaned processes, since they can
488 # leave around files when they die
489 _drone_manager.execute_actions()
490 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000491
showard170873e2009-01-07 00:22:26 +0000492
showardd1195652009-12-08 22:21:02 +0000493 def _create_recovery_agent_tasks(self):
494 return (self._get_queue_entry_agent_tasks()
495 + self._get_special_task_agent_tasks(is_active=True))
496
497
498 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700499 """
500 Get agent tasks for all hqe in the specified states.
501
502 Loosely this translates to taking a hqe in one of the specified states,
503 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
504 through _get_agent_task_for_queue_entry. Each queue entry can only have
505 one agent task at a time, but there might be multiple queue entries in
506 the group.
507
508 @return: A list of AgentTasks.
509 """
showardd1195652009-12-08 22:21:02 +0000510 # host queue entry statuses handled directly by AgentTasks (Verifying is
511 # handled through SpecialTasks, so is not listed here)
512 statuses = (models.HostQueueEntry.Status.STARTING,
513 models.HostQueueEntry.Status.RUNNING,
514 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000515 models.HostQueueEntry.Status.PARSING,
516 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000517 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000518 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000519 where='status IN (%s)' % status_list)
520
521 agent_tasks = []
522 used_queue_entries = set()
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800523 hqe_count_by_status = {}
showardd1195652009-12-08 22:21:02 +0000524 for entry in queue_entries:
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800525 hqe_count_by_status[entry.status] = (
526 hqe_count_by_status.get(entry.status, 0) + 1)
showardd1195652009-12-08 22:21:02 +0000527 if self.get_agents_for_entry(entry):
528 # already being handled
529 continue
530 if entry in used_queue_entries:
531 # already picked up by a synchronous job
532 continue
533 agent_task = self._get_agent_task_for_queue_entry(entry)
534 agent_tasks.append(agent_task)
535 used_queue_entries.update(agent_task.queue_entries)
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800536
537 for status, count in hqe_count_by_status.iteritems():
538 metrics.Gauge(
539 'chromeos/autotest/scheduler/active_host_queue_entries'
540 ).set(count, fields={'status': status})
541
showardd1195652009-12-08 22:21:02 +0000542 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000543
544
showardd1195652009-12-08 22:21:02 +0000545 def _get_special_task_agent_tasks(self, is_active=False):
546 special_tasks = models.SpecialTask.objects.filter(
547 is_active=is_active, is_complete=False)
548 return [self._get_agent_task_for_special_task(task)
549 for task in special_tasks]
550
551
552 def _get_agent_task_for_queue_entry(self, queue_entry):
553 """
beeps8bb1f7d2013-08-05 01:30:09 -0700554 Construct an AgentTask instance for the given active HostQueueEntry.
555
showardd1195652009-12-08 22:21:02 +0000556 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700557 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000558 """
559 task_entries = queue_entry.job.get_group_entries(queue_entry)
560 self._check_for_duplicate_host_entries(task_entries)
561
562 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
563 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000564 if queue_entry.is_hostless():
565 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000566 return QueueTask(queue_entries=task_entries)
567 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700568 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000569 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700570 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000571 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700572 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000573
Prashanth B0e960282014-05-13 19:38:28 -0700574 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800575 '_get_agent_task_for_queue_entry got entry with '
576 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000577
578
579 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000580 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
581 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000582 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000583 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000584 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000585 if using_host:
showardd1195652009-12-08 22:21:02 +0000586 self._assert_host_has_no_agent(task_entry)
587
588
589 def _assert_host_has_no_agent(self, entry):
590 """
591 @param entry: a HostQueueEntry or a SpecialTask
592 """
593 if self.host_has_agent(entry.host):
594 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700595 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000596 'While scheduling %s, host %s already has a host agent %s'
597 % (entry, entry.host, agent.task))
598
599
600 def _get_agent_task_for_special_task(self, special_task):
601 """
602 Construct an AgentTask class to run the given SpecialTask and add it
603 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700604
MK Ryu35d661e2014-09-25 17:44:10 -0700605 A special task is created through schedule_special_tasks, but only if
beeps8bb1f7d2013-08-05 01:30:09 -0700606 the host doesn't already have an agent. This happens through
607 add_agent_task. All special agent tasks are given a host on creation,
608 and a Null hqe. To create a SpecialAgentTask object, you need a
609 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
610 object contains a hqe it's passed on to the special agent task, which
611 creates a HostQueueEntry and saves it as it's queue_entry.
612
showardd1195652009-12-08 22:21:02 +0000613 @param special_task: a models.SpecialTask instance
614 @returns an AgentTask to run this SpecialTask
615 """
616 self._assert_host_has_no_agent(special_task)
617
beeps5e2bb4a2013-10-28 11:26:45 -0700618 special_agent_task_classes = (prejob_task.CleanupTask,
619 prejob_task.VerifyTask,
620 prejob_task.RepairTask,
621 prejob_task.ResetTask,
622 prejob_task.ProvisionTask)
623
showardd1195652009-12-08 22:21:02 +0000624 for agent_task_class in special_agent_task_classes:
625 if agent_task_class.TASK_TYPE == special_task.task:
626 return agent_task_class(task=special_task)
627
Prashanth B0e960282014-05-13 19:38:28 -0700628 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800629 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000630
631
632 def _register_pidfiles(self, agent_tasks):
633 for agent_task in agent_tasks:
634 agent_task.register_necessary_pidfiles()
635
636
637 def _recover_tasks(self, agent_tasks):
638 orphans = _drone_manager.get_orphaned_autoserv_processes()
639
640 for agent_task in agent_tasks:
641 agent_task.recover()
642 if agent_task.monitor and agent_task.monitor.has_process():
643 orphans.discard(agent_task.monitor.get_process())
644 self.add_agent_task(agent_task)
645
646 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000647
648
showard8cc058f2009-09-08 16:26:33 +0000649 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000650 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
651 % status):
showard0db3d432009-10-12 20:29:15 +0000652 if entry.status == status and not self.get_agents_for_entry(entry):
653 # The status can change during iteration, e.g., if job.run()
654 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000655 yield entry
656
657
showard6878e8b2009-07-20 22:37:45 +0000658 def _check_for_remaining_orphan_processes(self, orphans):
659 if not orphans:
660 return
661 subject = 'Unrecovered orphan autoserv processes remain'
662 message = '\n'.join(str(process) for process in orphans)
663 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000664
665 die_on_orphans = global_config.global_config.get_config_value(
666 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
667
668 if die_on_orphans:
669 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000670
showard170873e2009-01-07 00:22:26 +0000671
showard8cc058f2009-09-08 16:26:33 +0000672 def _recover_pending_entries(self):
673 for entry in self._get_unassigned_entries(
674 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000675 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000676 entry.on_pending()
677
678
showardb8900452009-10-12 20:31:01 +0000679 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000680 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000681 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
682 unrecovered_hqes = []
683 for queue_entry in queue_entries:
684 special_tasks = models.SpecialTask.objects.filter(
685 task__in=(models.SpecialTask.Task.CLEANUP,
686 models.SpecialTask.Task.VERIFY),
687 queue_entry__id=queue_entry.id,
688 is_complete=False)
689 if special_tasks.count() == 0:
690 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000691
showardb8900452009-10-12 20:31:01 +0000692 if unrecovered_hqes:
693 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700694 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000695 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000696 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000697
698
showard65db3932009-10-28 19:54:35 +0000699 def _schedule_special_tasks(self):
700 """
701 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700702
703 Special tasks include PreJobTasks like verify, reset and cleanup.
704 They are created through _schedule_new_jobs and associated with a hqe
705 This method translates SpecialTasks to the appropriate AgentTask and
706 adds them to the dispatchers agents list, so _handle_agents can execute
707 them.
showard65db3932009-10-28 19:54:35 +0000708 """
Prashanth B4ec98672014-05-15 10:44:54 -0700709 # When the host scheduler is responsible for acquisition we only want
710 # to run tasks with leased hosts. All hqe tasks will already have
711 # leased hosts, and we don't want to run frontend tasks till the host
712 # scheduler has vetted the assignment. Note that this doesn't include
713 # frontend tasks with hosts leased by other active hqes.
714 for task in self._job_query_manager.get_prioritized_special_tasks(
715 only_tasks_with_leased_hosts=not _inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000716 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000717 continue
showardd1195652009-12-08 22:21:02 +0000718 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000719
720
showard170873e2009-01-07 00:22:26 +0000721 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000722 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000723 # should never happen
showarded2afea2009-07-07 20:54:07 +0000724 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000725 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000726 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700727 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000728 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000729
730
jadmanski0afbb632008-06-06 21:10:57 +0000731 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000732 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700733 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000734 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000735 if self.host_has_agent(host):
736 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000737 continue
showard8cc058f2009-09-08 16:26:33 +0000738 if self._host_has_scheduled_special_task(host):
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700739 # host will have a special task scheduled on the next tick
showard8cc058f2009-09-08 16:26:33 +0000740 continue
showard170873e2009-01-07 00:22:26 +0000741 if print_message:
showardb18134f2009-03-20 20:52:18 +0000742 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000743 models.SpecialTask.objects.create(
744 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000745 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000746
747
jadmanski0afbb632008-06-06 21:10:57 +0000748 def _recover_hosts(self):
749 # recover "Repair Failed" hosts
750 message = 'Reverifying dead host %s'
751 self._reverify_hosts_where("status = 'Repair Failed'",
752 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000753
754
showard89f84db2009-03-12 20:39:13 +0000755 def _refresh_pending_queue_entries(self):
756 """
757 Lookup the pending HostQueueEntries and call our HostScheduler
758 refresh() method given that list. Return the list.
759
760 @returns A list of pending HostQueueEntries sorted in priority order.
761 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700762 queue_entries = self._job_query_manager.get_pending_queue_entries(
763 only_hostless=not _inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000764 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000765 return []
showard89f84db2009-03-12 20:39:13 +0000766 return queue_entries
767
768
showarda9545c02009-12-18 22:44:26 +0000769 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800770 """Schedule a hostless (suite) job.
771
772 @param queue_entry: The queue_entry representing the hostless job.
773 """
showarda9545c02009-12-18 22:44:26 +0000774 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700775
776 # Need to set execution_subdir before setting the status:
777 # After a restart of the scheduler, agents will be restored for HQEs in
778 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
779 # execution_subdir is needed. Therefore it must be set before entering
780 # one of these states.
781 # Otherwise, if the scheduler was interrupted between setting the status
782 # and the execution_subdir, upon it's restart restoring agents would
783 # fail.
784 # Is there a way to get a status in one of these states without going
785 # through this code? Following cases are possible:
786 # - If it's aborted before being started:
787 # active bit will be 0, so there's nothing to parse, it will just be
788 # set to completed by _find_aborting. Critical statuses are skipped.
789 # - If it's aborted or it fails after being started:
790 # It was started, so this code was executed.
791 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000792 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000793
794
beepscc9fc702013-12-02 12:45:38 -0800795 def _schedule_host_job(self, host, queue_entry):
796 """Schedules a job on the given host.
797
798 1. Assign the host to the hqe, if it isn't already assigned.
799 2. Create a SpecialAgentTask for the hqe.
800 3. Activate the hqe.
801
802 @param queue_entry: The job to schedule.
803 @param host: The host to schedule the job on.
804 """
805 if self.host_has_agent(host):
806 host_agent_task = list(self._host_agents.get(host.id))[0].task
807 subject = 'Host with agents assigned to an HQE'
808 message = ('HQE: %s assigned host %s, but the host has '
809 'agent: %s for queue_entry %s. The HQE '
beepsf7d35162014-02-13 16:42:13 -0800810 'will have to try and acquire a host next tick ' %
beepscc9fc702013-12-02 12:45:38 -0800811 (queue_entry, host.hostname, host_agent_task,
812 host_agent_task.queue_entry))
813 email_manager.manager.enqueue_notify_email(subject, message)
beepscc9fc702013-12-02 12:45:38 -0800814 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700815 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800816
817
showard89f84db2009-03-12 20:39:13 +0000818 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700819 """
820 Find any new HQEs and call schedule_pre_job_tasks for it.
821
822 This involves setting the status of the HQE and creating a row in the
823 db corresponding the the special task, through
824 scheduler_models._queue_special_task. The new db row is then added as
825 an agent to the dispatcher through _schedule_special_tasks and
826 scheduled for execution on the drone through _handle_agents.
827 """
showard89f84db2009-03-12 20:39:13 +0000828 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000829
beepscc9fc702013-12-02 12:45:38 -0800830 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700831 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700832 new_jobs_with_hosts = 0
833 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800834 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700835 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000836
beepscc9fc702013-12-02 12:45:38 -0800837 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000838 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000839 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700840 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000841 else:
beepscc9fc702013-12-02 12:45:38 -0800842 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700843 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700844
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800845 metrics.Counter(
846 'chromeos/autotest/scheduler/scheduled_jobs_hostless'
847 ).increment_by(new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800848 if not host_jobs:
849 return
Prashanth Bf66d51b2014-05-06 12:42:25 -0700850 if not _inline_host_acquisition:
851 message = ('Found %s jobs that need hosts though '
852 '_inline_host_acquisition=%s. Will acquire hosts.' %
853 ([str(job) for job in host_jobs],
854 _inline_host_acquisition))
855 email_manager.manager.enqueue_notify_email(
856 'Processing unexpected host acquisition requests', message)
857 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
858 for host_assignment in jobs_with_hosts:
859 self._schedule_host_job(host_assignment.host, host_assignment.job)
860 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800861
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800862 metrics.Counter(
863 'chromeos/autotest/scheduler/scheduled_jobs_with_hosts'
864 ).increment_by(new_jobs_with_hosts)
865 # TODO(pprabhu): Decide what to do about this metric. Million dollar
866 # question: What happens to jobs that were not matched. Do they stay in
867 # the queue, and get processed right here in the next tick (then we want
868 # a guage corresponding to the number of outstanding unmatched host
869 # jobs), or are they handled somewhere else (then we need a counter
870 # corresponding to failed_to_match_with_hosts jobs).
871 #autotest_stats.Gauge(key).send('new_jobs_without_hosts',
872 # new_jobs_need_hosts -
873 # new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000874
875
showard8cc058f2009-09-08 16:26:33 +0000876 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700877 """
878 Adds agents to the dispatcher.
879
880 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
881 QueueTask for example, will have a job with a control file, and
882 the agent will have methods that poll, abort and check if the queue
883 task is finished. The dispatcher runs the agent_task, as well as
884 other agents in it's _agents member, through _handle_agents, by
885 calling the Agents tick().
886
887 This method creates an agent for each HQE in one of (starting, running,
888 gathering, parsing, archiving) states, and adds it to the dispatcher so
889 it is handled by _handle_agents.
890 """
showardd1195652009-12-08 22:21:02 +0000891 for agent_task in self._get_queue_entry_agent_tasks():
892 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000893
894
895 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000896 for entry in scheduler_models.HostQueueEntry.fetch(
897 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000898 task = entry.job.schedule_delayed_callback_task(entry)
899 if task:
showardd1195652009-12-08 22:21:02 +0000900 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000901
902
jadmanski0afbb632008-06-06 21:10:57 +0000903 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700904 """
905 Looks through the afe_host_queue_entries for an aborted entry.
906
907 The aborted bit is set on an HQE in many ways, the most common
908 being when a user requests an abort through the frontend, which
909 results in an rpc from the afe to abort_host_queue_entries.
910 """
jamesrene7c65cb2010-06-08 20:38:10 +0000911 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000912 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700913 where='aborted=1 and complete=0'):
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800914
915 # If the job is running on a shard, let the shard handle aborting
916 # it and sync back the right status.
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800917 if entry.job.shard_id is not None and not server_utils.is_shard():
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800918 logging.info('Waiting for shard %s to abort hqe %s',
919 entry.job.shard_id, entry)
920 continue
921
showardf4a2e502009-07-28 20:06:39 +0000922 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800923
924 # The task would have started off with both is_complete and
925 # is_active = False. Aborted tasks are neither active nor complete.
926 # For all currently active tasks this will happen through the agent,
927 # but we need to manually update the special tasks that haven't
928 # started yet, because they don't have agents.
929 models.SpecialTask.objects.filter(is_active=False,
930 queue_entry_id=entry.id).update(is_complete=True)
931
showardd3dc1992009-04-22 21:01:40 +0000932 for agent in self.get_agents_for_entry(entry):
933 agent.abort()
934 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000935 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700936 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000937 for job in jobs_to_stop:
938 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000939
940
beeps8bb1f7d2013-08-05 01:30:09 -0700941 def _find_aborted_special_tasks(self):
942 """
943 Find SpecialTasks that have been marked for abortion.
944
945 Poll the database looking for SpecialTasks that are active
946 and have been marked for abortion, then abort them.
947 """
948
949 # The completed and active bits are very important when it comes
950 # to scheduler correctness. The active bit is set through the prolog
951 # of a special task, and reset through the cleanup method of the
952 # SpecialAgentTask. The cleanup is called both through the abort and
953 # epilog. The complete bit is set in several places, and in general
954 # a hanging job will have is_active=1 is_complete=0, while a special
955 # task which completed will have is_active=0 is_complete=1. To check
956 # aborts we directly check active because the complete bit is set in
957 # several places, including the epilog of agent tasks.
958 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
959 is_aborted=True)
960 for task in aborted_tasks:
961 # There are 2 ways to get the agent associated with a task,
962 # through the host and through the hqe. A special task
963 # always needs a host, but doesn't always need a hqe.
964 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700965 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000966
beeps8bb1f7d2013-08-05 01:30:09 -0700967 # The epilog preforms critical actions such as
968 # queueing the next SpecialTask, requeuing the
969 # hqe etc, however it doesn't actually kill the
970 # monitor process and set the 'done' bit. Epilogs
971 # assume that the job failed, and that the monitor
972 # process has already written an exit code. The
973 # done bit is a necessary condition for
974 # _handle_agents to schedule any more special
975 # tasks against the host, and it must be set
976 # in addition to is_active, is_complete and success.
977 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000978 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700979
980
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700981 def _can_start_agent(self, agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000982 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000983 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000984 return True
985 # don't allow any nonzero-process agents to run after we've reached a
986 # limit (this avoids starvation of many-process agents)
987 if have_reached_limit:
988 return False
989 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000990 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000991 agent.task.owner_username,
992 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000993 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000994 return False
showard4c5374f2008-09-04 17:02:56 +0000995 return True
996
997
jadmanski0afbb632008-06-06 21:10:57 +0000998 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700999 """
1000 Handles agents of the dispatcher.
1001
1002 Appropriate Agents are added to the dispatcher through
1003 _schedule_running_host_queue_entries. These agents each
1004 have a task. This method runs the agents task through
1005 agent.tick() leading to:
1006 agent.start
1007 prolog -> AgentTasks prolog
1008 For each queue entry:
1009 sets host status/status to Running
1010 set started_on in afe_host_queue_entries
1011 run -> AgentTasks run
1012 Creates PidfileRunMonitor
1013 Queues the autoserv command line for this AgentTask
1014 via the drone manager. These commands are executed
1015 through the drone managers execute actions.
1016 poll -> AgentTasks/BaseAgentTask poll
1017 checks the monitors exit_code.
1018 Executes epilog if task is finished.
1019 Executes AgentTasks _finish_task
1020 finish_task is usually responsible for setting the status
1021 of the HQE/host, and updating it's active and complete fileds.
1022
1023 agent.is_done
1024 Removed the agent from the dispatchers _agents queue.
1025 Is_done checks the finished bit on the agent, that is
1026 set based on the Agents task. During the agents poll
1027 we check to see if the monitor process has exited in
1028 it's finish method, and set the success member of the
1029 task based on this exit code.
1030 """
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001031 num_started_this_tick = 0
1032 num_finished_this_tick = 0
showard4c5374f2008-09-04 17:02:56 +00001033 have_reached_limit = False
1034 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -07001035 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +00001036 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001037 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1038 'queue_entry ids:%s' % (agent.host_ids,
1039 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001040 if not agent.started:
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001041 if not self._can_start_agent(agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001042 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001043 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001044 continue
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001045 num_started_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001046 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001047 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001048 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001049 if agent.is_done():
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001050 num_finished_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001051 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001052 self.remove_agent(agent)
Prathmesh Prabhued7ece92016-11-23 11:19:43 -08001053
1054 metrics.Counter(
1055 'chromeos/autotest/scheduler/agent_processes_started'
1056 ).increment_by(num_started_this_tick)
1057 metrics.Counter(
1058 'chromeos/autotest/scheduler/agent_processes_finished'
1059 ).increment_by(num_finished_this_tick)
1060 num_agent_processes = _drone_manager.total_running_processes()
1061 metrics.Gauge(
1062 'chromeos/autotest/scheduler/agent_processes'
1063 ).set(num_agent_processes)
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001064 logging.info('%d running processes. %d added this tick.',
Prathmesh Prabhued7ece92016-11-23 11:19:43 -08001065 num_agent_processes, num_started_this_tick)
mbligh36768f02008-02-22 18:28:33 +00001066
1067
showard29f7cd22009-04-29 21:16:24 +00001068 def _process_recurring_runs(self):
1069 recurring_runs = models.RecurringRun.objects.filter(
1070 start_date__lte=datetime.datetime.now())
1071 for rrun in recurring_runs:
1072 # Create job from template
1073 job = rrun.job
1074 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001075 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001076
1077 host_objects = info['hosts']
1078 one_time_hosts = info['one_time_hosts']
1079 metahost_objects = info['meta_hosts']
1080 dependencies = info['dependencies']
1081 atomic_group = info['atomic_group']
1082
1083 for host in one_time_hosts or []:
1084 this_host = models.Host.create_one_time_host(host.hostname)
1085 host_objects.append(this_host)
1086
1087 try:
1088 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001089 options=options,
showard29f7cd22009-04-29 21:16:24 +00001090 host_objects=host_objects,
1091 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001092 atomic_group=atomic_group)
1093
1094 except Exception, ex:
1095 logging.exception(ex)
1096 #TODO send email
1097
1098 if rrun.loop_count == 1:
1099 rrun.delete()
1100 else:
1101 if rrun.loop_count != 0: # if not infinite loop
1102 # calculate new start_date
1103 difference = datetime.timedelta(seconds=rrun.loop_period)
1104 rrun.start_date = rrun.start_date + difference
1105 rrun.loop_count -= 1
1106 rrun.save()
1107
1108
Simran Basia858a232012-08-21 11:04:37 -07001109SiteDispatcher = utils.import_site_class(
1110 __file__, 'autotest_lib.scheduler.site_monitor_db',
1111 'SiteDispatcher', BaseDispatcher)
1112
1113class Dispatcher(SiteDispatcher):
1114 pass
1115
1116
mbligh36768f02008-02-22 18:28:33 +00001117class Agent(object):
showard77182562009-06-10 00:16:05 +00001118 """
Alex Miller47715eb2013-07-24 03:34:01 -07001119 An agent for use by the Dispatcher class to perform a task. An agent wraps
1120 around an AgentTask mainly to associate the AgentTask with the queue_entry
1121 and host ids.
showard77182562009-06-10 00:16:05 +00001122
1123 The following methods are required on all task objects:
1124 poll() - Called periodically to let the task check its status and
1125 update its internal state. If the task succeeded.
1126 is_done() - Returns True if the task is finished.
1127 abort() - Called when an abort has been requested. The task must
1128 set its aborted attribute to True if it actually aborted.
1129
1130 The following attributes are required on all task objects:
1131 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001132 success - bool, True if this task succeeded.
1133 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1134 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001135 """
1136
1137
showard418785b2009-11-23 20:19:59 +00001138 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001139 """
Alex Miller47715eb2013-07-24 03:34:01 -07001140 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001141 """
showard8cc058f2009-09-08 16:26:33 +00001142 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001143
showard77182562009-06-10 00:16:05 +00001144 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001145 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001146
showard8cc058f2009-09-08 16:26:33 +00001147 self.queue_entry_ids = task.queue_entry_ids
1148 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001149
showard8cc058f2009-09-08 16:26:33 +00001150 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001151 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001152
1153
jadmanski0afbb632008-06-06 21:10:57 +00001154 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001155 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001156 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001157 self.task.poll()
1158 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001159 self.finished = True
showardec113162008-05-08 00:52:49 +00001160
1161
jadmanski0afbb632008-06-06 21:10:57 +00001162 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001163 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001164
1165
showardd3dc1992009-04-22 21:01:40 +00001166 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001167 if self.task:
1168 self.task.abort()
1169 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001170 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001171 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001172
showardd3dc1992009-04-22 21:01:40 +00001173
beeps5e2bb4a2013-10-28 11:26:45 -07001174class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001175 """
1176 Common functionality for QueueTask and HostlessQueueTask
1177 """
1178 def __init__(self, queue_entries):
1179 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001180 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001181 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001182
1183
showard73ec0442009-02-07 02:05:20 +00001184 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001185 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001186
1187
jamesrenc44ae992010-02-19 00:12:54 +00001188 def _write_control_file(self, execution_path):
1189 control_path = _drone_manager.attach_file_to_execution(
1190 execution_path, self.job.control_file)
1191 return control_path
1192
1193
Aviv Keshet308e7362013-05-21 14:43:16 -07001194 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001195 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001196 execution_path = self.queue_entries[0].execution_path()
1197 control_path = self._write_control_file(execution_path)
1198 hostnames = ','.join(entry.host.hostname
1199 for entry in self.queue_entries
1200 if not entry.is_hostless())
1201
1202 execution_tag = self.queue_entries[0].execution_tag()
1203 params = _autoserv_command_line(
1204 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001205 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001206 _drone_manager.absolute_path(control_path)],
1207 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001208 if self.job.is_image_update_job():
1209 params += ['--image', self.job.update_image_path]
1210
jamesrenc44ae992010-02-19 00:12:54 +00001211 return params
showardd1195652009-12-08 22:21:02 +00001212
1213
1214 @property
1215 def num_processes(self):
1216 return len(self.queue_entries)
1217
1218
1219 @property
1220 def owner_username(self):
1221 return self.job.owner
1222
1223
1224 def _working_directory(self):
1225 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001226
1227
jadmanski0afbb632008-06-06 21:10:57 +00001228 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001229 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001230 keyval_dict = self.job.keyval_dict()
1231 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001232 group_name = self.queue_entries[0].get_group_name()
1233 if group_name:
1234 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001235 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001236 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001237 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001238 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001239
1240
showard35162b02009-03-03 02:17:30 +00001241 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001242 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001243 _drone_manager.write_lines_to_file(error_file_path,
1244 [_LOST_PROCESS_ERROR])
1245
1246
showardd3dc1992009-04-22 21:01:40 +00001247 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001248 if not self.monitor:
1249 return
1250
showardd9205182009-04-27 20:09:55 +00001251 self._write_job_finished()
1252
showard35162b02009-03-03 02:17:30 +00001253 if self.monitor.lost_process:
1254 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001255
jadmanskif7fa2cc2008-10-01 14:13:23 +00001256
showardcbd74612008-11-19 21:42:02 +00001257 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001258 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001259 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001260 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001261 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001262
1263
jadmanskif7fa2cc2008-10-01 14:13:23 +00001264 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001265 if not self.monitor or not self.monitor.has_process():
1266 return
1267
jadmanskif7fa2cc2008-10-01 14:13:23 +00001268 # build up sets of all the aborted_by and aborted_on values
1269 aborted_by, aborted_on = set(), set()
1270 for queue_entry in self.queue_entries:
1271 if queue_entry.aborted_by:
1272 aborted_by.add(queue_entry.aborted_by)
1273 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1274 aborted_on.add(t)
1275
1276 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001277 # TODO(showard): this conditional is now obsolete, we just need to leave
1278 # it in temporarily for backwards compatibility over upgrades. delete
1279 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001280 assert len(aborted_by) <= 1
1281 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001282 aborted_by_value = aborted_by.pop()
1283 aborted_on_value = max(aborted_on)
1284 else:
1285 aborted_by_value = 'autotest_system'
1286 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001287
showarda0382352009-02-11 23:36:43 +00001288 self._write_keyval_after_job("aborted_by", aborted_by_value)
1289 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001290
showardcbd74612008-11-19 21:42:02 +00001291 aborted_on_string = str(datetime.datetime.fromtimestamp(
1292 aborted_on_value))
1293 self._write_status_comment('Job aborted by %s on %s' %
1294 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001295
1296
jadmanski0afbb632008-06-06 21:10:57 +00001297 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001298 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001299 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001300 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001301
1302
jadmanski0afbb632008-06-06 21:10:57 +00001303 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001304 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001305 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001306
1307
1308class QueueTask(AbstractQueueTask):
1309 def __init__(self, queue_entries):
1310 super(QueueTask, self).__init__(queue_entries)
1311 self._set_ids(queue_entries=queue_entries)
1312
1313
1314 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001315 self._check_queue_entry_statuses(
1316 self.queue_entries,
1317 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1318 models.HostQueueEntry.Status.RUNNING),
1319 allowed_host_statuses=(models.Host.Status.PENDING,
1320 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001321
1322 super(QueueTask, self).prolog()
1323
1324 for queue_entry in self.queue_entries:
1325 self._write_host_keyvals(queue_entry.host)
1326 queue_entry.host.set_status(models.Host.Status.RUNNING)
1327 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001328
1329
1330 def _finish_task(self):
1331 super(QueueTask, self)._finish_task()
1332
1333 for queue_entry in self.queue_entries:
1334 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001335 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001336
1337
Alex Miller9f01d5d2013-08-08 02:26:01 -07001338 def _command_line(self):
Dan Shi36cfd832014-10-10 13:38:51 -07001339 invocation = super(QueueTask, self)._command_line()
1340 # Check if server-side packaging is needed.
1341 if (_enable_ssp_container and
1342 self.job.control_type == control_data.CONTROL_TYPE.SERVER and
1343 self.job.require_ssp != False):
Dan Shiec1d47d2015-02-13 11:38:13 -08001344 invocation += ['--require-ssp']
Dan Shi36cfd832014-10-10 13:38:51 -07001345 keyval_dict = self.job.keyval_dict()
1346 test_source_build = keyval_dict.get('test_source_build', None)
1347 if test_source_build:
1348 invocation += ['--test_source_build', test_source_build]
Dan Shi70647ca2015-07-16 22:52:35 -07001349 if self.job.parent_job_id:
1350 invocation += ['--parent_job_id', str(self.job.parent_job_id)]
Dan Shi36cfd832014-10-10 13:38:51 -07001351 return invocation + ['--verify_job_repo_url']
Alex Miller9f01d5d2013-08-08 02:26:01 -07001352
1353
Dan Shi1a189052013-10-28 14:41:35 -07001354class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001355 def __init__(self, queue_entry):
1356 super(HostlessQueueTask, self).__init__([queue_entry])
1357 self.queue_entry_ids = [queue_entry.id]
1358
1359
1360 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001361 super(HostlessQueueTask, self).prolog()
1362
1363
mbligh4608b002010-01-05 18:22:35 +00001364 def _finish_task(self):
1365 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001366
1367 # When a job is added to database, its initial status is always
1368 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1369 # status, check if any of them can be started. If scheduler hits some
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001370 # limit, e.g., max_hostless_jobs_per_drone, scheduler will
1371 # leave these jobs in Starting status. Otherwise, the jobs'
1372 # status will be changed to Running, and an autoserv process
1373 # will be started in drone for each of these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001374 # If the entry is still in status Starting, the process has not started
1375 # yet. Therefore, there is no need to parse and collect log. Without
1376 # this check, exception will be raised by scheduler as execution_subdir
1377 # for this queue entry does not have a value yet.
1378 hqe = self.queue_entries[0]
1379 if hqe.status != models.HostQueueEntry.Status.STARTING:
1380 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001381
1382
mbligh36768f02008-02-22 18:28:33 +00001383if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001384 main()