blob: 0dbce9a199ef8e533eaa2b540cdb68541792249b [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
Richard Barnetteffed1722016-05-18 15:57:22 -07002
3#pylint: disable=C0111
4
mbligh36768f02008-02-22 18:28:33 +00005"""
6Autotest scheduler
7"""
showard909c7a62008-07-15 21:52:38 +00008
Dan Shif6c65bd2014-08-29 16:15:07 -07009import datetime
10import gc
11import logging
12import optparse
13import os
14import signal
15import sys
16import time
showard402934a2009-12-21 22:20:47 +000017
Alex Miller05d7b4c2013-03-04 07:49:38 -080018import common
showard21baa452008-10-21 00:08:39 +000019from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000020
21import django.db
Aviv Keshet65fed072016-06-29 10:20:55 -070022from chromite.lib import metrics
Richard Barnetteffed1722016-05-18 15:57:22 -070023from chromite.lib import ts_mon_config
showard402934a2009-12-21 22:20:47 +000024
Dan Shiec1d47d2015-02-13 11:38:13 -080025from autotest_lib.client.common_lib import control_data
Prashanth B0e960282014-05-13 19:38:28 -070026from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070027from autotest_lib.client.common_lib import utils
Prashanth B0e960282014-05-13 19:38:28 -070028from autotest_lib.frontend.afe import models, rpc_utils
Fang Dengc330bee2014-10-21 18:10:55 -070029from autotest_lib.scheduler import agent_task, drone_manager
beeps5e2bb4a2013-10-28 11:26:45 -070030from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
31from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070032from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070033from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070034from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000035from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080036from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070037from autotest_lib.server import autoserv_utils
Dan Shi114e1722016-01-10 18:12:53 -080038from autotest_lib.server import system_utils
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080039from autotest_lib.server import utils as server_utils
Dan Shicf2e8dd2015-05-07 17:18:48 -070040from autotest_lib.site_utils import metadata_reporter
Dan Shib9144a42014-12-01 16:09:32 -080041from autotest_lib.site_utils import server_manager_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080042
Dan Shicf2e8dd2015-05-07 17:18:48 -070043
showard549afad2009-08-20 23:33:36 +000044BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
45PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000046
mbligh36768f02008-02-22 18:28:33 +000047RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000048AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
49
50if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000051 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000052AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
53AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
54
55if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000056 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000057
showard35162b02009-03-03 02:17:30 +000058# error message to leave in results dir when an autoserv process disappears
59# mysteriously
60_LOST_PROCESS_ERROR = """\
61Autoserv failed abnormally during execution for this job, probably due to a
62system error on the Autotest server. Full results may not be available. Sorry.
63"""
64
Prashanth B0e960282014-05-13 19:38:28 -070065_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070066_db = None
mbligh36768f02008-02-22 18:28:33 +000067_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070068
69# These 2 globals are replaced for testing
70_autoserv_directory = autoserv_utils.autoserv_directory
71_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000072_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000073_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070074_inline_host_acquisition = global_config.global_config.get_config_value(
75 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
76 default=True)
77
Dan Shiec1d47d2015-02-13 11:38:13 -080078_enable_ssp_container = global_config.global_config.get_config_value(
79 'AUTOSERV', 'enable_ssp_container', type=bool,
80 default=True)
mbligh36768f02008-02-22 18:28:33 +000081
mbligh83c1e9e2009-05-01 23:10:41 +000082def _site_init_monitor_db_dummy():
83 return {}
84
85
jamesren76fcf192010-04-21 20:39:50 +000086def _verify_default_drone_set_exists():
87 if (models.DroneSet.drone_sets_enabled() and
88 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070089 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080090 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000091
92
93def _sanity_check():
94 """Make sure the configs are consistent before starting the scheduler"""
95 _verify_default_drone_set_exists()
96
97
mbligh36768f02008-02-22 18:28:33 +000098def main():
showard27f33872009-04-07 18:20:53 +000099 try:
showard549afad2009-08-20 23:33:36 +0000100 try:
101 main_without_exception_handling()
102 except SystemExit:
103 raise
104 except:
105 logging.exception('Exception escaping in monitor_db')
106 raise
107 finally:
108 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000109
110
111def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700112 scheduler_lib.setup_logging(
113 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
114 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000115 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000116 parser = optparse.OptionParser(usage)
117 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
118 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000119 parser.add_option('--test', help='Indicate that scheduler is under ' +
120 'test and should use dummy autoserv and no parsing',
121 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700122 parser.add_option('--production',
123 help=('Indicate that scheduler is running in production '
124 'environment and it can use database that is not '
125 'hosted in localhost. If it is set to False, '
126 'scheduler will fail if database is not in '
127 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700128 action='store_true', default=False)
jadmanski0afbb632008-06-06 21:10:57 +0000129 (options, args) = parser.parse_args()
130 if len(args) != 1:
131 parser.print_usage()
132 return
mbligh36768f02008-02-22 18:28:33 +0000133
Dan Shif6c65bd2014-08-29 16:15:07 -0700134 scheduler_lib.check_production_settings(options)
135
showard5613c662009-06-08 23:30:33 +0000136 scheduler_enabled = global_config.global_config.get_config_value(
137 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
138
139 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800140 logging.error("Scheduler not enabled, set enable_scheduler to true in "
141 "the global_config's SCHEDULER section to enable it. "
142 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000143 sys.exit(1)
144
jadmanski0afbb632008-06-06 21:10:57 +0000145 global RESULTS_DIR
146 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000147
mbligh83c1e9e2009-05-01 23:10:41 +0000148 site_init = utils.import_site_function(__file__,
149 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
150 _site_init_monitor_db_dummy)
151 site_init()
152
showardcca334f2009-03-12 20:38:34 +0000153 # Change the cwd while running to avoid issues incase we were launched from
154 # somewhere odd (such as a random NFS home directory of the person running
155 # sudo to launch us as the appropriate user).
156 os.chdir(RESULTS_DIR)
157
jamesrenc7d387e2010-08-10 21:48:30 +0000158 # This is helpful for debugging why stuff a scheduler launches is
159 # misbehaving.
160 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000161
jadmanski0afbb632008-06-06 21:10:57 +0000162 if options.test:
163 global _autoserv_path
164 _autoserv_path = 'autoserv_dummy'
165 global _testing_mode
166 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000167
jamesrenc44ae992010-02-19 00:12:54 +0000168 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000169 server.start()
170
Dan Shicf2e8dd2015-05-07 17:18:48 -0700171 # Start the thread to report metadata.
172 metadata_reporter.start()
173
Paul Hobbseedcb8b2016-10-05 16:44:27 -0700174 with ts_mon_config.SetupTsMonGlobalState('autotest_scheduler',
175 indirect=True):
176 try:
177 initialize()
178 dispatcher = Dispatcher()
179 dispatcher.initialize(recover_hosts=options.recover_hosts)
180 minimum_tick_sec = global_config.global_config.get_config_value(
181 scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
Richard Barnetteffed1722016-05-18 15:57:22 -0700182
Paul Hobbseedcb8b2016-10-05 16:44:27 -0700183 while not _shutdown and not server._shutdown_scheduler:
184 start = time.time()
185 dispatcher.tick()
186 curr_tick_sec = time.time() - start
187 if minimum_tick_sec > curr_tick_sec:
188 time.sleep(minimum_tick_sec - curr_tick_sec)
189 else:
190 time.sleep(0.0001)
191 except server_manager_utils.ServerActionError as e:
192 # This error is expected when the server is not in primary status
193 # for scheduler role. Thus do not send email for it.
194 logging.exception(e)
195 except Exception:
196 email_manager.manager.log_stacktrace(
197 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000198
Paul Hobbsabd3b052016-10-03 18:25:23 +0000199 metadata_reporter.abort()
200 email_manager.manager.send_queued_emails()
201 server.shutdown()
202 _drone_manager.shutdown()
203 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000204
205
Prashanth B4ec98672014-05-15 10:44:54 -0700206def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000207 global _shutdown
208 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000209 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000210
211
jamesrenc44ae992010-02-19 00:12:54 +0000212def initialize():
showardb18134f2009-03-20 20:52:18 +0000213 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
214 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000215
showard8de37132009-08-31 18:33:08 +0000216 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000217 logging.critical("monitor_db already running, aborting!")
218 sys.exit(1)
219 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000220
showardb1e51872008-10-07 11:08:18 +0000221 if _testing_mode:
222 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700223 scheduler_lib.DB_CONFIG_SECTION, 'database',
224 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000225
Dan Shib9144a42014-12-01 16:09:32 -0800226 # If server database is enabled, check if the server has role `scheduler`.
227 # If the server does not have scheduler role, exception will be raised and
228 # scheduler will not continue to run.
229 if server_manager_utils.use_server_db():
230 server_manager_utils.confirm_server_has_role(hostname='localhost',
231 role='scheduler')
232
jadmanski0afbb632008-06-06 21:10:57 +0000233 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700234 global _db_manager
235 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700236 global _db
237 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000238 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700239 signal.signal(signal.SIGINT, handle_signal)
240 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000241
jamesrenc44ae992010-02-19 00:12:54 +0000242 initialize_globals()
243 scheduler_models.initialize()
244
Dan Shi114e1722016-01-10 18:12:53 -0800245 drone_list = system_utils.get_drones()
showard170873e2009-01-07 00:22:26 +0000246 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000247 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000248 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
249
showardb18134f2009-03-20 20:52:18 +0000250 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000251
252
jamesrenc44ae992010-02-19 00:12:54 +0000253def initialize_globals():
254 global _drone_manager
255 _drone_manager = drone_manager.instance()
256
257
showarded2afea2009-07-07 20:54:07 +0000258def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
259 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000260 """
261 @returns The autoserv command line as a list of executable + parameters.
262
263 @param machines - string - A machine or comma separated list of machines
264 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000265 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700266 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
267 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000268 @param queue_entry - A HostQueueEntry object - If supplied and no Job
269 object was supplied, this will be used to lookup the Job object.
270 """
Simran Basi1bf60eb2015-12-01 16:39:29 -0800271 command = autoserv_utils.autoserv_run_job_command(_autoserv_directory,
Aviv Keshet308e7362013-05-21 14:43:16 -0700272 machines, results_directory=drone_manager.WORKING_DIRECTORY,
273 extra_args=extra_args, job=job, queue_entry=queue_entry,
Simran Basi1bf60eb2015-12-01 16:39:29 -0800274 verbose=verbose, in_lab=True)
275 return command
showard87ba02a2009-04-20 19:37:32 +0000276
277
Simran Basia858a232012-08-21 11:04:37 -0700278class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800279
280
jadmanski0afbb632008-06-06 21:10:57 +0000281 def __init__(self):
282 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000283 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700284 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000285 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700286 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700287 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Dan Shic458f662015-04-29 12:12:38 -0700288 _db, _drone_manager)
showard170873e2009-01-07 00:22:26 +0000289 self._host_agents = {}
290 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000291 self._tick_count = 0
292 self._last_garbage_stats_time = time.time()
293 self._seconds_between_garbage_stats = 60 * (
294 global_config.global_config.get_config_value(
295 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700296 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700297 self._tick_debug = global_config.global_config.get_config_value(
298 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
299 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700300 self._extra_debugging = global_config.global_config.get_config_value(
301 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
302 default=False)
mbligh36768f02008-02-22 18:28:33 +0000303
Prashanth Bf66d51b2014-05-06 12:42:25 -0700304 # If _inline_host_acquisition is set the scheduler will acquire and
305 # release hosts against jobs inline, with the tick. Otherwise the
306 # scheduler will only focus on jobs that already have hosts, and
307 # will not explicitly unlease a host when a job finishes using it.
308 self._job_query_manager = query_managers.AFEJobQueryManager()
309 self._host_scheduler = (host_scheduler.BaseHostScheduler()
310 if _inline_host_acquisition else
311 host_scheduler.DummyHostScheduler())
312
mbligh36768f02008-02-22 18:28:33 +0000313
showard915958d2009-04-22 21:00:58 +0000314 def initialize(self, recover_hosts=True):
315 self._periodic_cleanup.initialize()
316 self._24hr_upkeep.initialize()
Dan Shi55d58992015-05-05 09:10:02 -0700317 # Execute all actions queued in the cleanup tasks. Scheduler tick will
318 # run a refresh task first. If there is any action in the queue, refresh
319 # will raise an exception.
320 _drone_manager.execute_actions()
showard915958d2009-04-22 21:00:58 +0000321
jadmanski0afbb632008-06-06 21:10:57 +0000322 # always recover processes
323 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000324
jadmanski0afbb632008-06-06 21:10:57 +0000325 if recover_hosts:
326 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000327
328
Simran Basi0ec94dd2012-08-28 09:50:10 -0700329 def _log_tick_msg(self, msg):
330 if self._tick_debug:
331 logging.debug(msg)
332
333
Simran Basidef92872012-09-20 13:34:34 -0700334 def _log_extra_msg(self, msg):
335 if self._extra_debugging:
336 logging.debug(msg)
337
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -0800338 @metrics.SecondsTimerDecorator(
339 'chromeos/autotest/scheduler/tick_durations/tick')
jadmanski0afbb632008-06-06 21:10:57 +0000340 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700341 """
342 This is an altered version of tick() where we keep track of when each
343 major step begins so we can try to figure out where we are using most
344 of the tick time.
345 """
Dan Shi114e1722016-01-10 18:12:53 -0800346 system_utils.DroneCache.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700347 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000348 self._garbage_collection()
Prashanth B340fd1e2014-06-22 12:44:10 -0700349 self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
350 _drone_manager.trigger_refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700351 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000352 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700353 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000354 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700355 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000356 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700357 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000358 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700359 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000360 self._schedule_new_jobs()
Prashanth B340fd1e2014-06-22 12:44:10 -0700361 self._log_tick_msg('Calling _drone_manager.sync_refresh().')
362 _drone_manager.sync_refresh()
Dan Shi55d58992015-05-05 09:10:02 -0700363 # _run_cleanup must be called between drone_manager.sync_refresh, and
364 # drone_manager.execute_actions, as sync_refresh will clear the calls
365 # queued in drones. Therefore, any action that calls drone.queue_call
366 # to add calls to the drone._calls, should be after drone refresh is
367 # completed and before drone_manager.execute_actions at the end of the
368 # tick.
369 self._log_tick_msg('Calling _run_cleanup().')
370 self._run_cleanup()
Prashanth B67548092014-07-11 18:46:01 -0700371 self._log_tick_msg('Calling _find_aborting().')
372 self._find_aborting()
373 self._log_tick_msg('Calling _find_aborted_special_tasks().')
374 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700375 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000376 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700377 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000378 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700379 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000380 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700381 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700382 'email_manager.manager.send_queued_emails().')
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800383 # TODO(pprabhu) crbug.com/667171: Add back metric for % time spent in
384 # this sub-step.
385 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700386 self._log_tick_msg('Calling django.db.reset_queries().')
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800387 # TODO(pprabhu) crbug.com/667171: Add back metric for % time spent in
388 # this sub-step.
389 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000390 self._tick_count += 1
Aviv Keshet65fed072016-06-29 10:20:55 -0700391 metrics.Counter('chromeos/autotest/scheduler/tick').increment()
mbligh36768f02008-02-22 18:28:33 +0000392
showard97aed502008-11-04 02:01:24 +0000393
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -0800394 # TODO(pprabhu) crbug.com/667171: Add back metric for % time spent in this
395 # sub-step.
mblighf3294cc2009-04-08 21:17:38 +0000396 def _run_cleanup(self):
397 self._periodic_cleanup.run_cleanup_maybe()
398 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000399
mbligh36768f02008-02-22 18:28:33 +0000400
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -0800401 # TODO(pprabhu) crbug.com/667171: Add back metric for % time spent in this
402 # sub-step.
showardf13a9e22009-12-18 22:54:09 +0000403 def _garbage_collection(self):
404 threshold_time = time.time() - self._seconds_between_garbage_stats
405 if threshold_time < self._last_garbage_stats_time:
406 # Don't generate these reports very often.
407 return
408
409 self._last_garbage_stats_time = time.time()
410 # Force a full level 0 collection (because we can, it doesn't hurt
411 # at this interval).
412 gc.collect()
413 logging.info('Logging garbage collector stats on tick %d.',
414 self._tick_count)
415 gc_stats._log_garbage_collector_stats()
416
417
showard170873e2009-01-07 00:22:26 +0000418 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
419 for object_id in object_ids:
420 agent_dict.setdefault(object_id, set()).add(agent)
421
422
423 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
424 for object_id in object_ids:
425 assert object_id in agent_dict
426 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700427 # If an ID has no more active agent associated, there is no need to
428 # keep it in the dictionary. Otherwise, scheduler will keep an
429 # unnecessarily big dictionary until being restarted.
430 if not agent_dict[object_id]:
431 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000432
433
showardd1195652009-12-08 22:21:02 +0000434 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700435 """
436 Creates and adds an agent to the dispatchers list.
437
438 In creating the agent we also pass on all the queue_entry_ids and
439 host_ids from the special agent task. For every agent we create, we
440 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
441 against the host_ids given to it. So theoritically, a host can have any
442 number of agents associated with it, and each of them can have any
443 special agent task, though in practice we never see > 1 agent/task per
444 host at any time.
445
446 @param agent_task: A SpecialTask for the agent to manage.
447 """
showardd1195652009-12-08 22:21:02 +0000448 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000449 self._agents.append(agent)
450 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000451 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
452 self._register_agent_for_ids(self._queue_entry_agents,
453 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000454
showard170873e2009-01-07 00:22:26 +0000455
456 def get_agents_for_entry(self, queue_entry):
457 """
458 Find agents corresponding to the specified queue_entry.
459 """
showardd3dc1992009-04-22 21:01:40 +0000460 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000461
462
463 def host_has_agent(self, host):
464 """
465 Determine if there is currently an Agent present using this host.
466 """
467 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000468
469
jadmanski0afbb632008-06-06 21:10:57 +0000470 def remove_agent(self, agent):
471 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000472 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
473 agent)
474 self._unregister_agent_for_ids(self._queue_entry_agents,
475 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000476
477
showard8cc058f2009-09-08 16:26:33 +0000478 def _host_has_scheduled_special_task(self, host):
479 return bool(models.SpecialTask.objects.filter(host__id=host.id,
480 is_active=False,
481 is_complete=False))
482
483
jadmanski0afbb632008-06-06 21:10:57 +0000484 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000485 agent_tasks = self._create_recovery_agent_tasks()
486 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000487 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000488 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000489 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000490 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000491 self._reverify_remaining_hosts()
492 # reinitialize drones after killing orphaned processes, since they can
493 # leave around files when they die
494 _drone_manager.execute_actions()
495 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000496
showard170873e2009-01-07 00:22:26 +0000497
showardd1195652009-12-08 22:21:02 +0000498 def _create_recovery_agent_tasks(self):
499 return (self._get_queue_entry_agent_tasks()
500 + self._get_special_task_agent_tasks(is_active=True))
501
502
503 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700504 """
505 Get agent tasks for all hqe in the specified states.
506
507 Loosely this translates to taking a hqe in one of the specified states,
508 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
509 through _get_agent_task_for_queue_entry. Each queue entry can only have
510 one agent task at a time, but there might be multiple queue entries in
511 the group.
512
513 @return: A list of AgentTasks.
514 """
showardd1195652009-12-08 22:21:02 +0000515 # host queue entry statuses handled directly by AgentTasks (Verifying is
516 # handled through SpecialTasks, so is not listed here)
517 statuses = (models.HostQueueEntry.Status.STARTING,
518 models.HostQueueEntry.Status.RUNNING,
519 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000520 models.HostQueueEntry.Status.PARSING,
521 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000522 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000523 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000524 where='status IN (%s)' % status_list)
525
526 agent_tasks = []
527 used_queue_entries = set()
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800528 hqe_count_by_status = {}
showardd1195652009-12-08 22:21:02 +0000529 for entry in queue_entries:
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800530 hqe_count_by_status[entry.status] = (
531 hqe_count_by_status.get(entry.status, 0) + 1)
showardd1195652009-12-08 22:21:02 +0000532 if self.get_agents_for_entry(entry):
533 # already being handled
534 continue
535 if entry in used_queue_entries:
536 # already picked up by a synchronous job
537 continue
538 agent_task = self._get_agent_task_for_queue_entry(entry)
539 agent_tasks.append(agent_task)
540 used_queue_entries.update(agent_task.queue_entries)
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800541
542 for status, count in hqe_count_by_status.iteritems():
543 metrics.Gauge(
544 'chromeos/autotest/scheduler/active_host_queue_entries'
545 ).set(count, fields={'status': status})
546
showardd1195652009-12-08 22:21:02 +0000547 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000548
549
showardd1195652009-12-08 22:21:02 +0000550 def _get_special_task_agent_tasks(self, is_active=False):
551 special_tasks = models.SpecialTask.objects.filter(
552 is_active=is_active, is_complete=False)
553 return [self._get_agent_task_for_special_task(task)
554 for task in special_tasks]
555
556
557 def _get_agent_task_for_queue_entry(self, queue_entry):
558 """
beeps8bb1f7d2013-08-05 01:30:09 -0700559 Construct an AgentTask instance for the given active HostQueueEntry.
560
showardd1195652009-12-08 22:21:02 +0000561 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700562 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000563 """
564 task_entries = queue_entry.job.get_group_entries(queue_entry)
565 self._check_for_duplicate_host_entries(task_entries)
566
567 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
568 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000569 if queue_entry.is_hostless():
570 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000571 return QueueTask(queue_entries=task_entries)
572 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700573 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000574 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700575 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000576 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700577 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000578
Prashanth B0e960282014-05-13 19:38:28 -0700579 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800580 '_get_agent_task_for_queue_entry got entry with '
581 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000582
583
584 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000585 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
586 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000587 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000588 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000589 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000590 if using_host:
showardd1195652009-12-08 22:21:02 +0000591 self._assert_host_has_no_agent(task_entry)
592
593
594 def _assert_host_has_no_agent(self, entry):
595 """
596 @param entry: a HostQueueEntry or a SpecialTask
597 """
598 if self.host_has_agent(entry.host):
599 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700600 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000601 'While scheduling %s, host %s already has a host agent %s'
602 % (entry, entry.host, agent.task))
603
604
605 def _get_agent_task_for_special_task(self, special_task):
606 """
607 Construct an AgentTask class to run the given SpecialTask and add it
608 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700609
MK Ryu35d661e2014-09-25 17:44:10 -0700610 A special task is created through schedule_special_tasks, but only if
beeps8bb1f7d2013-08-05 01:30:09 -0700611 the host doesn't already have an agent. This happens through
612 add_agent_task. All special agent tasks are given a host on creation,
613 and a Null hqe. To create a SpecialAgentTask object, you need a
614 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
615 object contains a hqe it's passed on to the special agent task, which
616 creates a HostQueueEntry and saves it as it's queue_entry.
617
showardd1195652009-12-08 22:21:02 +0000618 @param special_task: a models.SpecialTask instance
619 @returns an AgentTask to run this SpecialTask
620 """
621 self._assert_host_has_no_agent(special_task)
622
beeps5e2bb4a2013-10-28 11:26:45 -0700623 special_agent_task_classes = (prejob_task.CleanupTask,
624 prejob_task.VerifyTask,
625 prejob_task.RepairTask,
626 prejob_task.ResetTask,
627 prejob_task.ProvisionTask)
628
showardd1195652009-12-08 22:21:02 +0000629 for agent_task_class in special_agent_task_classes:
630 if agent_task_class.TASK_TYPE == special_task.task:
631 return agent_task_class(task=special_task)
632
Prashanth B0e960282014-05-13 19:38:28 -0700633 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800634 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000635
636
637 def _register_pidfiles(self, agent_tasks):
638 for agent_task in agent_tasks:
639 agent_task.register_necessary_pidfiles()
640
641
642 def _recover_tasks(self, agent_tasks):
643 orphans = _drone_manager.get_orphaned_autoserv_processes()
644
645 for agent_task in agent_tasks:
646 agent_task.recover()
647 if agent_task.monitor and agent_task.monitor.has_process():
648 orphans.discard(agent_task.monitor.get_process())
649 self.add_agent_task(agent_task)
650
651 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000652
653
showard8cc058f2009-09-08 16:26:33 +0000654 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000655 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
656 % status):
showard0db3d432009-10-12 20:29:15 +0000657 if entry.status == status and not self.get_agents_for_entry(entry):
658 # The status can change during iteration, e.g., if job.run()
659 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000660 yield entry
661
662
showard6878e8b2009-07-20 22:37:45 +0000663 def _check_for_remaining_orphan_processes(self, orphans):
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800664 m = 'chromeos/autotest/errors/unrecovered_orphan_processes'
665 metrics.Gauge(m).set(len(orphans))
666
showard6878e8b2009-07-20 22:37:45 +0000667 if not orphans:
668 return
669 subject = 'Unrecovered orphan autoserv processes remain'
670 message = '\n'.join(str(process) for process in orphans)
mbligh5fa9e112009-08-03 16:46:06 +0000671 die_on_orphans = global_config.global_config.get_config_value(
672 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
673
674 if die_on_orphans:
675 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000676
showard170873e2009-01-07 00:22:26 +0000677
showard8cc058f2009-09-08 16:26:33 +0000678 def _recover_pending_entries(self):
679 for entry in self._get_unassigned_entries(
680 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000681 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000682 entry.on_pending()
683
684
showardb8900452009-10-12 20:31:01 +0000685 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000686 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000687 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
688 unrecovered_hqes = []
689 for queue_entry in queue_entries:
690 special_tasks = models.SpecialTask.objects.filter(
691 task__in=(models.SpecialTask.Task.CLEANUP,
692 models.SpecialTask.Task.VERIFY),
693 queue_entry__id=queue_entry.id,
694 is_complete=False)
695 if special_tasks.count() == 0:
696 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000697
showardb8900452009-10-12 20:31:01 +0000698 if unrecovered_hqes:
699 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700700 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000701 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000702 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000703
704
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -0800705 # TODO(pprabhu) crbug.com/667171: Add back metric for % time spent in this
706 # sub-step.
showard65db3932009-10-28 19:54:35 +0000707 def _schedule_special_tasks(self):
708 """
709 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700710
711 Special tasks include PreJobTasks like verify, reset and cleanup.
712 They are created through _schedule_new_jobs and associated with a hqe
713 This method translates SpecialTasks to the appropriate AgentTask and
714 adds them to the dispatchers agents list, so _handle_agents can execute
715 them.
showard65db3932009-10-28 19:54:35 +0000716 """
Prashanth B4ec98672014-05-15 10:44:54 -0700717 # When the host scheduler is responsible for acquisition we only want
718 # to run tasks with leased hosts. All hqe tasks will already have
719 # leased hosts, and we don't want to run frontend tasks till the host
720 # scheduler has vetted the assignment. Note that this doesn't include
721 # frontend tasks with hosts leased by other active hqes.
722 for task in self._job_query_manager.get_prioritized_special_tasks(
723 only_tasks_with_leased_hosts=not _inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000724 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000725 continue
showardd1195652009-12-08 22:21:02 +0000726 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000727
728
showard170873e2009-01-07 00:22:26 +0000729 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000730 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000731 # should never happen
showarded2afea2009-07-07 20:54:07 +0000732 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000733 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000734 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700735 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000736 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000737
738
jadmanski0afbb632008-06-06 21:10:57 +0000739 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000740 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700741 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000742 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000743 if self.host_has_agent(host):
744 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000745 continue
showard8cc058f2009-09-08 16:26:33 +0000746 if self._host_has_scheduled_special_task(host):
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700747 # host will have a special task scheduled on the next tick
showard8cc058f2009-09-08 16:26:33 +0000748 continue
showard170873e2009-01-07 00:22:26 +0000749 if print_message:
showardb18134f2009-03-20 20:52:18 +0000750 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000751 models.SpecialTask.objects.create(
752 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000753 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000754
755
jadmanski0afbb632008-06-06 21:10:57 +0000756 def _recover_hosts(self):
757 # recover "Repair Failed" hosts
758 message = 'Reverifying dead host %s'
759 self._reverify_hosts_where("status = 'Repair Failed'",
760 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000761
762
showard89f84db2009-03-12 20:39:13 +0000763 def _refresh_pending_queue_entries(self):
764 """
765 Lookup the pending HostQueueEntries and call our HostScheduler
766 refresh() method given that list. Return the list.
767
768 @returns A list of pending HostQueueEntries sorted in priority order.
769 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700770 queue_entries = self._job_query_manager.get_pending_queue_entries(
771 only_hostless=not _inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000772 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000773 return []
showard89f84db2009-03-12 20:39:13 +0000774 return queue_entries
775
776
showarda9545c02009-12-18 22:44:26 +0000777 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800778 """Schedule a hostless (suite) job.
779
780 @param queue_entry: The queue_entry representing the hostless job.
781 """
showarda9545c02009-12-18 22:44:26 +0000782 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700783
784 # Need to set execution_subdir before setting the status:
785 # After a restart of the scheduler, agents will be restored for HQEs in
786 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
787 # execution_subdir is needed. Therefore it must be set before entering
788 # one of these states.
789 # Otherwise, if the scheduler was interrupted between setting the status
790 # and the execution_subdir, upon it's restart restoring agents would
791 # fail.
792 # Is there a way to get a status in one of these states without going
793 # through this code? Following cases are possible:
794 # - If it's aborted before being started:
795 # active bit will be 0, so there's nothing to parse, it will just be
796 # set to completed by _find_aborting. Critical statuses are skipped.
797 # - If it's aborted or it fails after being started:
798 # It was started, so this code was executed.
799 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000800 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000801
802
beepscc9fc702013-12-02 12:45:38 -0800803 def _schedule_host_job(self, host, queue_entry):
804 """Schedules a job on the given host.
805
806 1. Assign the host to the hqe, if it isn't already assigned.
807 2. Create a SpecialAgentTask for the hqe.
808 3. Activate the hqe.
809
810 @param queue_entry: The job to schedule.
811 @param host: The host to schedule the job on.
812 """
813 if self.host_has_agent(host):
814 host_agent_task = list(self._host_agents.get(host.id))[0].task
beepscc9fc702013-12-02 12:45:38 -0800815 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700816 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800817
818
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -0800819 # TODO(pprabhu) crbug.com/667171: Add back metric for % time spent in this
820 # sub-step.
showard89f84db2009-03-12 20:39:13 +0000821 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700822 """
823 Find any new HQEs and call schedule_pre_job_tasks for it.
824
825 This involves setting the status of the HQE and creating a row in the
826 db corresponding the the special task, through
827 scheduler_models._queue_special_task. The new db row is then added as
828 an agent to the dispatcher through _schedule_special_tasks and
829 scheduled for execution on the drone through _handle_agents.
830 """
showard89f84db2009-03-12 20:39:13 +0000831 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000832
beepscc9fc702013-12-02 12:45:38 -0800833 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700834 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700835 new_jobs_with_hosts = 0
836 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800837 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700838 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000839
beepscc9fc702013-12-02 12:45:38 -0800840 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000841 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000842 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700843 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000844 else:
beepscc9fc702013-12-02 12:45:38 -0800845 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700846 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700847
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800848 metrics.Counter(
849 'chromeos/autotest/scheduler/scheduled_jobs_hostless'
850 ).increment_by(new_hostless_jobs)
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800851
beepscc9fc702013-12-02 12:45:38 -0800852 if not host_jobs:
853 return
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800854
Prashanth Bf66d51b2014-05-06 12:42:25 -0700855 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
856 for host_assignment in jobs_with_hosts:
857 self._schedule_host_job(host_assignment.host, host_assignment.job)
858 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800859
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800860 metrics.Counter(
861 'chromeos/autotest/scheduler/scheduled_jobs_with_hosts'
862 ).increment_by(new_jobs_with_hosts)
863 # TODO(pprabhu): Decide what to do about this metric. Million dollar
864 # question: What happens to jobs that were not matched. Do they stay in
865 # the queue, and get processed right here in the next tick (then we want
866 # a guage corresponding to the number of outstanding unmatched host
867 # jobs), or are they handled somewhere else (then we need a counter
868 # corresponding to failed_to_match_with_hosts jobs).
869 #autotest_stats.Gauge(key).send('new_jobs_without_hosts',
870 # new_jobs_need_hosts -
871 # new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000872
873
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -0800874 # TODO(pprabhu) crbug.com/667171: Add back metric for % time spent in this
875 # sub-step.
showard8cc058f2009-09-08 16:26:33 +0000876 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700877 """
878 Adds agents to the dispatcher.
879
880 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
881 QueueTask for example, will have a job with a control file, and
882 the agent will have methods that poll, abort and check if the queue
883 task is finished. The dispatcher runs the agent_task, as well as
884 other agents in it's _agents member, through _handle_agents, by
885 calling the Agents tick().
886
887 This method creates an agent for each HQE in one of (starting, running,
888 gathering, parsing, archiving) states, and adds it to the dispatcher so
889 it is handled by _handle_agents.
890 """
showardd1195652009-12-08 22:21:02 +0000891 for agent_task in self._get_queue_entry_agent_tasks():
892 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000893
894
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -0800895 # TODO(pprabhu) crbug.com/667171: Add back metric for % time spent in this
896 # sub-step.
showard8cc058f2009-09-08 16:26:33 +0000897 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000898 for entry in scheduler_models.HostQueueEntry.fetch(
899 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000900 task = entry.job.schedule_delayed_callback_task(entry)
901 if task:
showardd1195652009-12-08 22:21:02 +0000902 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000903
904
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -0800905 # TODO(pprabhu) crbug.com/667171: Add back metric for % time spent in this
906 # sub-step.
jadmanski0afbb632008-06-06 21:10:57 +0000907 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700908 """
909 Looks through the afe_host_queue_entries for an aborted entry.
910
911 The aborted bit is set on an HQE in many ways, the most common
912 being when a user requests an abort through the frontend, which
913 results in an rpc from the afe to abort_host_queue_entries.
914 """
jamesrene7c65cb2010-06-08 20:38:10 +0000915 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000916 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700917 where='aborted=1 and complete=0'):
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800918
919 # If the job is running on a shard, let the shard handle aborting
920 # it and sync back the right status.
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800921 if entry.job.shard_id is not None and not server_utils.is_shard():
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800922 logging.info('Waiting for shard %s to abort hqe %s',
923 entry.job.shard_id, entry)
924 continue
925
showardf4a2e502009-07-28 20:06:39 +0000926 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800927
928 # The task would have started off with both is_complete and
929 # is_active = False. Aborted tasks are neither active nor complete.
930 # For all currently active tasks this will happen through the agent,
931 # but we need to manually update the special tasks that haven't
932 # started yet, because they don't have agents.
933 models.SpecialTask.objects.filter(is_active=False,
934 queue_entry_id=entry.id).update(is_complete=True)
935
showardd3dc1992009-04-22 21:01:40 +0000936 for agent in self.get_agents_for_entry(entry):
937 agent.abort()
938 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000939 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700940 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000941 for job in jobs_to_stop:
942 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000943
944
beeps8bb1f7d2013-08-05 01:30:09 -0700945 def _find_aborted_special_tasks(self):
946 """
947 Find SpecialTasks that have been marked for abortion.
948
949 Poll the database looking for SpecialTasks that are active
950 and have been marked for abortion, then abort them.
951 """
952
953 # The completed and active bits are very important when it comes
954 # to scheduler correctness. The active bit is set through the prolog
955 # of a special task, and reset through the cleanup method of the
956 # SpecialAgentTask. The cleanup is called both through the abort and
957 # epilog. The complete bit is set in several places, and in general
958 # a hanging job will have is_active=1 is_complete=0, while a special
959 # task which completed will have is_active=0 is_complete=1. To check
960 # aborts we directly check active because the complete bit is set in
961 # several places, including the epilog of agent tasks.
962 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
963 is_aborted=True)
964 for task in aborted_tasks:
965 # There are 2 ways to get the agent associated with a task,
966 # through the host and through the hqe. A special task
967 # always needs a host, but doesn't always need a hqe.
968 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700969 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000970
beeps8bb1f7d2013-08-05 01:30:09 -0700971 # The epilog preforms critical actions such as
972 # queueing the next SpecialTask, requeuing the
973 # hqe etc, however it doesn't actually kill the
974 # monitor process and set the 'done' bit. Epilogs
975 # assume that the job failed, and that the monitor
976 # process has already written an exit code. The
977 # done bit is a necessary condition for
978 # _handle_agents to schedule any more special
979 # tasks against the host, and it must be set
980 # in addition to is_active, is_complete and success.
981 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000982 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700983
984
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700985 def _can_start_agent(self, agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000986 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000987 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000988 return True
989 # don't allow any nonzero-process agents to run after we've reached a
990 # limit (this avoids starvation of many-process agents)
991 if have_reached_limit:
992 return False
993 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000994 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000995 agent.task.owner_username,
996 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000997 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000998 return False
showard4c5374f2008-09-04 17:02:56 +0000999 return True
1000
1001
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -08001002 # TODO(pprabhu) crbug.com/667171: Add back metric for % time spent in this
1003 # sub-step.
jadmanski0afbb632008-06-06 21:10:57 +00001004 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -07001005 """
1006 Handles agents of the dispatcher.
1007
1008 Appropriate Agents are added to the dispatcher through
1009 _schedule_running_host_queue_entries. These agents each
1010 have a task. This method runs the agents task through
1011 agent.tick() leading to:
1012 agent.start
1013 prolog -> AgentTasks prolog
1014 For each queue entry:
1015 sets host status/status to Running
1016 set started_on in afe_host_queue_entries
1017 run -> AgentTasks run
1018 Creates PidfileRunMonitor
1019 Queues the autoserv command line for this AgentTask
1020 via the drone manager. These commands are executed
1021 through the drone managers execute actions.
1022 poll -> AgentTasks/BaseAgentTask poll
1023 checks the monitors exit_code.
1024 Executes epilog if task is finished.
1025 Executes AgentTasks _finish_task
1026 finish_task is usually responsible for setting the status
1027 of the HQE/host, and updating it's active and complete fileds.
1028
1029 agent.is_done
1030 Removed the agent from the dispatchers _agents queue.
1031 Is_done checks the finished bit on the agent, that is
1032 set based on the Agents task. During the agents poll
1033 we check to see if the monitor process has exited in
1034 it's finish method, and set the success member of the
1035 task based on this exit code.
1036 """
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001037 num_started_this_tick = 0
1038 num_finished_this_tick = 0
showard4c5374f2008-09-04 17:02:56 +00001039 have_reached_limit = False
1040 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -07001041 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +00001042 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001043 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1044 'queue_entry ids:%s' % (agent.host_ids,
1045 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001046 if not agent.started:
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001047 if not self._can_start_agent(agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001048 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001049 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001050 continue
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001051 num_started_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001052 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001053 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001054 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001055 if agent.is_done():
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001056 num_finished_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001057 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001058 self.remove_agent(agent)
Prathmesh Prabhued7ece92016-11-23 11:19:43 -08001059
1060 metrics.Counter(
1061 'chromeos/autotest/scheduler/agent_processes_started'
1062 ).increment_by(num_started_this_tick)
1063 metrics.Counter(
1064 'chromeos/autotest/scheduler/agent_processes_finished'
1065 ).increment_by(num_finished_this_tick)
1066 num_agent_processes = _drone_manager.total_running_processes()
1067 metrics.Gauge(
1068 'chromeos/autotest/scheduler/agent_processes'
1069 ).set(num_agent_processes)
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001070 logging.info('%d running processes. %d added this tick.',
Prathmesh Prabhued7ece92016-11-23 11:19:43 -08001071 num_agent_processes, num_started_this_tick)
mbligh36768f02008-02-22 18:28:33 +00001072
1073
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -08001074 # TODO(pprabhu) crbug.com/667171: Add back metric for % time spent in this
1075 # sub-step.
showard29f7cd22009-04-29 21:16:24 +00001076 def _process_recurring_runs(self):
1077 recurring_runs = models.RecurringRun.objects.filter(
1078 start_date__lte=datetime.datetime.now())
1079 for rrun in recurring_runs:
1080 # Create job from template
1081 job = rrun.job
1082 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001083 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001084
1085 host_objects = info['hosts']
1086 one_time_hosts = info['one_time_hosts']
1087 metahost_objects = info['meta_hosts']
1088 dependencies = info['dependencies']
1089 atomic_group = info['atomic_group']
1090
1091 for host in one_time_hosts or []:
1092 this_host = models.Host.create_one_time_host(host.hostname)
1093 host_objects.append(this_host)
1094
1095 try:
1096 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001097 options=options,
showard29f7cd22009-04-29 21:16:24 +00001098 host_objects=host_objects,
1099 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001100 atomic_group=atomic_group)
1101
1102 except Exception, ex:
1103 logging.exception(ex)
1104 #TODO send email
1105
1106 if rrun.loop_count == 1:
1107 rrun.delete()
1108 else:
1109 if rrun.loop_count != 0: # if not infinite loop
1110 # calculate new start_date
1111 difference = datetime.timedelta(seconds=rrun.loop_period)
1112 rrun.start_date = rrun.start_date + difference
1113 rrun.loop_count -= 1
1114 rrun.save()
1115
1116
Simran Basia858a232012-08-21 11:04:37 -07001117SiteDispatcher = utils.import_site_class(
1118 __file__, 'autotest_lib.scheduler.site_monitor_db',
1119 'SiteDispatcher', BaseDispatcher)
1120
1121class Dispatcher(SiteDispatcher):
1122 pass
1123
1124
mbligh36768f02008-02-22 18:28:33 +00001125class Agent(object):
showard77182562009-06-10 00:16:05 +00001126 """
Alex Miller47715eb2013-07-24 03:34:01 -07001127 An agent for use by the Dispatcher class to perform a task. An agent wraps
1128 around an AgentTask mainly to associate the AgentTask with the queue_entry
1129 and host ids.
showard77182562009-06-10 00:16:05 +00001130
1131 The following methods are required on all task objects:
1132 poll() - Called periodically to let the task check its status and
1133 update its internal state. If the task succeeded.
1134 is_done() - Returns True if the task is finished.
1135 abort() - Called when an abort has been requested. The task must
1136 set its aborted attribute to True if it actually aborted.
1137
1138 The following attributes are required on all task objects:
1139 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001140 success - bool, True if this task succeeded.
1141 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1142 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001143 """
1144
1145
showard418785b2009-11-23 20:19:59 +00001146 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001147 """
Alex Miller47715eb2013-07-24 03:34:01 -07001148 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001149 """
showard8cc058f2009-09-08 16:26:33 +00001150 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001151
showard77182562009-06-10 00:16:05 +00001152 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001153 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001154
showard8cc058f2009-09-08 16:26:33 +00001155 self.queue_entry_ids = task.queue_entry_ids
1156 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001157
showard8cc058f2009-09-08 16:26:33 +00001158 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001159 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001160
1161
jadmanski0afbb632008-06-06 21:10:57 +00001162 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001163 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001164 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001165 self.task.poll()
1166 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001167 self.finished = True
showardec113162008-05-08 00:52:49 +00001168
1169
jadmanski0afbb632008-06-06 21:10:57 +00001170 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001171 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001172
1173
showardd3dc1992009-04-22 21:01:40 +00001174 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001175 if self.task:
1176 self.task.abort()
1177 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001178 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001179 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001180
showardd3dc1992009-04-22 21:01:40 +00001181
beeps5e2bb4a2013-10-28 11:26:45 -07001182class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001183 """
1184 Common functionality for QueueTask and HostlessQueueTask
1185 """
1186 def __init__(self, queue_entries):
1187 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001188 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001189 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001190
1191
showard73ec0442009-02-07 02:05:20 +00001192 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001193 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001194
1195
jamesrenc44ae992010-02-19 00:12:54 +00001196 def _write_control_file(self, execution_path):
1197 control_path = _drone_manager.attach_file_to_execution(
1198 execution_path, self.job.control_file)
1199 return control_path
1200
1201
Aviv Keshet308e7362013-05-21 14:43:16 -07001202 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001203 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001204 execution_path = self.queue_entries[0].execution_path()
1205 control_path = self._write_control_file(execution_path)
1206 hostnames = ','.join(entry.host.hostname
1207 for entry in self.queue_entries
1208 if not entry.is_hostless())
1209
1210 execution_tag = self.queue_entries[0].execution_tag()
1211 params = _autoserv_command_line(
1212 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001213 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001214 _drone_manager.absolute_path(control_path)],
1215 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001216 if self.job.is_image_update_job():
1217 params += ['--image', self.job.update_image_path]
1218
jamesrenc44ae992010-02-19 00:12:54 +00001219 return params
showardd1195652009-12-08 22:21:02 +00001220
1221
1222 @property
1223 def num_processes(self):
1224 return len(self.queue_entries)
1225
1226
1227 @property
1228 def owner_username(self):
1229 return self.job.owner
1230
1231
1232 def _working_directory(self):
1233 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001234
1235
jadmanski0afbb632008-06-06 21:10:57 +00001236 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001237 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001238 keyval_dict = self.job.keyval_dict()
1239 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001240 group_name = self.queue_entries[0].get_group_name()
1241 if group_name:
1242 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001243 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001244 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001245 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001246 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001247
1248
showard35162b02009-03-03 02:17:30 +00001249 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001250 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001251 _drone_manager.write_lines_to_file(error_file_path,
1252 [_LOST_PROCESS_ERROR])
1253
1254
showardd3dc1992009-04-22 21:01:40 +00001255 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001256 if not self.monitor:
1257 return
1258
showardd9205182009-04-27 20:09:55 +00001259 self._write_job_finished()
1260
showard35162b02009-03-03 02:17:30 +00001261 if self.monitor.lost_process:
1262 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001263
jadmanskif7fa2cc2008-10-01 14:13:23 +00001264
showardcbd74612008-11-19 21:42:02 +00001265 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001266 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001267 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001268 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001269 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001270
1271
jadmanskif7fa2cc2008-10-01 14:13:23 +00001272 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001273 if not self.monitor or not self.monitor.has_process():
1274 return
1275
jadmanskif7fa2cc2008-10-01 14:13:23 +00001276 # build up sets of all the aborted_by and aborted_on values
1277 aborted_by, aborted_on = set(), set()
1278 for queue_entry in self.queue_entries:
1279 if queue_entry.aborted_by:
1280 aborted_by.add(queue_entry.aborted_by)
1281 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1282 aborted_on.add(t)
1283
1284 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001285 # TODO(showard): this conditional is now obsolete, we just need to leave
1286 # it in temporarily for backwards compatibility over upgrades. delete
1287 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001288 assert len(aborted_by) <= 1
1289 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001290 aborted_by_value = aborted_by.pop()
1291 aborted_on_value = max(aborted_on)
1292 else:
1293 aborted_by_value = 'autotest_system'
1294 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001295
showarda0382352009-02-11 23:36:43 +00001296 self._write_keyval_after_job("aborted_by", aborted_by_value)
1297 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001298
showardcbd74612008-11-19 21:42:02 +00001299 aborted_on_string = str(datetime.datetime.fromtimestamp(
1300 aborted_on_value))
1301 self._write_status_comment('Job aborted by %s on %s' %
1302 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001303
1304
jadmanski0afbb632008-06-06 21:10:57 +00001305 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001306 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001307 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001308 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001309
1310
jadmanski0afbb632008-06-06 21:10:57 +00001311 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001312 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001313 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001314
1315
1316class QueueTask(AbstractQueueTask):
1317 def __init__(self, queue_entries):
1318 super(QueueTask, self).__init__(queue_entries)
1319 self._set_ids(queue_entries=queue_entries)
1320
1321
1322 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001323 self._check_queue_entry_statuses(
1324 self.queue_entries,
1325 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1326 models.HostQueueEntry.Status.RUNNING),
1327 allowed_host_statuses=(models.Host.Status.PENDING,
1328 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001329
1330 super(QueueTask, self).prolog()
1331
1332 for queue_entry in self.queue_entries:
1333 self._write_host_keyvals(queue_entry.host)
1334 queue_entry.host.set_status(models.Host.Status.RUNNING)
1335 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001336
1337
1338 def _finish_task(self):
1339 super(QueueTask, self)._finish_task()
1340
1341 for queue_entry in self.queue_entries:
1342 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001343 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001344
1345
Alex Miller9f01d5d2013-08-08 02:26:01 -07001346 def _command_line(self):
Dan Shi36cfd832014-10-10 13:38:51 -07001347 invocation = super(QueueTask, self)._command_line()
1348 # Check if server-side packaging is needed.
1349 if (_enable_ssp_container and
1350 self.job.control_type == control_data.CONTROL_TYPE.SERVER and
1351 self.job.require_ssp != False):
Dan Shiec1d47d2015-02-13 11:38:13 -08001352 invocation += ['--require-ssp']
Dan Shi36cfd832014-10-10 13:38:51 -07001353 keyval_dict = self.job.keyval_dict()
1354 test_source_build = keyval_dict.get('test_source_build', None)
1355 if test_source_build:
1356 invocation += ['--test_source_build', test_source_build]
Dan Shi70647ca2015-07-16 22:52:35 -07001357 if self.job.parent_job_id:
1358 invocation += ['--parent_job_id', str(self.job.parent_job_id)]
Dan Shi36cfd832014-10-10 13:38:51 -07001359 return invocation + ['--verify_job_repo_url']
Alex Miller9f01d5d2013-08-08 02:26:01 -07001360
1361
Dan Shi1a189052013-10-28 14:41:35 -07001362class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001363 def __init__(self, queue_entry):
1364 super(HostlessQueueTask, self).__init__([queue_entry])
1365 self.queue_entry_ids = [queue_entry.id]
1366
1367
1368 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001369 super(HostlessQueueTask, self).prolog()
1370
1371
mbligh4608b002010-01-05 18:22:35 +00001372 def _finish_task(self):
1373 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001374
1375 # When a job is added to database, its initial status is always
1376 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1377 # status, check if any of them can be started. If scheduler hits some
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001378 # limit, e.g., max_hostless_jobs_per_drone, scheduler will
1379 # leave these jobs in Starting status. Otherwise, the jobs'
1380 # status will be changed to Running, and an autoserv process
1381 # will be started in drone for each of these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001382 # If the entry is still in status Starting, the process has not started
1383 # yet. Therefore, there is no need to parse and collect log. Without
1384 # this check, exception will be raised by scheduler as execution_subdir
1385 # for this queue entry does not have a value yet.
1386 hqe = self.queue_entries[0]
1387 if hqe.status != models.HostQueueEntry.Status.STARTING:
1388 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001389
1390
mbligh36768f02008-02-22 18:28:33 +00001391if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001392 main()