blob: 0be6292cbfb2d61d3efd8c0eba7fa0e293c2d045 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
Richard Barnetteffed1722016-05-18 15:57:22 -07002
3#pylint: disable=C0111
4
mbligh36768f02008-02-22 18:28:33 +00005"""
6Autotest scheduler
7"""
showard909c7a62008-07-15 21:52:38 +00008
Dan Shif6c65bd2014-08-29 16:15:07 -07009import datetime
10import gc
11import logging
12import optparse
13import os
14import signal
15import sys
16import time
showard402934a2009-12-21 22:20:47 +000017
Alex Miller05d7b4c2013-03-04 07:49:38 -080018import common
showard21baa452008-10-21 00:08:39 +000019from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000020
21import django.db
Aviv Keshet65fed072016-06-29 10:20:55 -070022from chromite.lib import metrics
Richard Barnetteffed1722016-05-18 15:57:22 -070023from chromite.lib import ts_mon_config
showard402934a2009-12-21 22:20:47 +000024
Dan Shiec1d47d2015-02-13 11:38:13 -080025from autotest_lib.client.common_lib import control_data
Prashanth B0e960282014-05-13 19:38:28 -070026from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070027from autotest_lib.client.common_lib import utils
Prashanth B0e960282014-05-13 19:38:28 -070028from autotest_lib.frontend.afe import models, rpc_utils
Fang Dengc330bee2014-10-21 18:10:55 -070029from autotest_lib.scheduler import agent_task, drone_manager
beeps5e2bb4a2013-10-28 11:26:45 -070030from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
31from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070032from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070033from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070034from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000035from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080036from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070037from autotest_lib.server import autoserv_utils
Dan Shi114e1722016-01-10 18:12:53 -080038from autotest_lib.server import system_utils
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080039from autotest_lib.server import utils as server_utils
Dan Shicf2e8dd2015-05-07 17:18:48 -070040from autotest_lib.site_utils import metadata_reporter
Dan Shib9144a42014-12-01 16:09:32 -080041from autotest_lib.site_utils import server_manager_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080042
Dan Shicf2e8dd2015-05-07 17:18:48 -070043
showard549afad2009-08-20 23:33:36 +000044BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
45PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000046
mbligh36768f02008-02-22 18:28:33 +000047RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000048AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
49
50if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000051 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000052AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
53AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
54
55if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000056 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000057
showard35162b02009-03-03 02:17:30 +000058# error message to leave in results dir when an autoserv process disappears
59# mysteriously
60_LOST_PROCESS_ERROR = """\
61Autoserv failed abnormally during execution for this job, probably due to a
62system error on the Autotest server. Full results may not be available. Sorry.
63"""
64
Prashanth B0e960282014-05-13 19:38:28 -070065_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070066_db = None
mbligh36768f02008-02-22 18:28:33 +000067_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070068
69# These 2 globals are replaced for testing
70_autoserv_directory = autoserv_utils.autoserv_directory
71_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000072_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000073_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070074_inline_host_acquisition = global_config.global_config.get_config_value(
75 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
76 default=True)
77
Dan Shiec1d47d2015-02-13 11:38:13 -080078_enable_ssp_container = global_config.global_config.get_config_value(
79 'AUTOSERV', 'enable_ssp_container', type=bool,
80 default=True)
mbligh36768f02008-02-22 18:28:33 +000081
mbligh83c1e9e2009-05-01 23:10:41 +000082def _site_init_monitor_db_dummy():
83 return {}
84
85
jamesren76fcf192010-04-21 20:39:50 +000086def _verify_default_drone_set_exists():
87 if (models.DroneSet.drone_sets_enabled() and
88 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070089 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080090 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000091
92
93def _sanity_check():
94 """Make sure the configs are consistent before starting the scheduler"""
95 _verify_default_drone_set_exists()
96
97
mbligh36768f02008-02-22 18:28:33 +000098def main():
showard27f33872009-04-07 18:20:53 +000099 try:
showard549afad2009-08-20 23:33:36 +0000100 try:
101 main_without_exception_handling()
102 except SystemExit:
103 raise
104 except:
105 logging.exception('Exception escaping in monitor_db')
106 raise
107 finally:
108 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000109
110
111def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700112 scheduler_lib.setup_logging(
113 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
114 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000115 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000116 parser = optparse.OptionParser(usage)
117 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
118 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000119 parser.add_option('--test', help='Indicate that scheduler is under ' +
120 'test and should use dummy autoserv and no parsing',
121 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700122 parser.add_option('--production',
123 help=('Indicate that scheduler is running in production '
124 'environment and it can use database that is not '
125 'hosted in localhost. If it is set to False, '
126 'scheduler will fail if database is not in '
127 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700128 action='store_true', default=False)
jadmanski0afbb632008-06-06 21:10:57 +0000129 (options, args) = parser.parse_args()
130 if len(args) != 1:
131 parser.print_usage()
132 return
mbligh36768f02008-02-22 18:28:33 +0000133
Dan Shif6c65bd2014-08-29 16:15:07 -0700134 scheduler_lib.check_production_settings(options)
135
showard5613c662009-06-08 23:30:33 +0000136 scheduler_enabled = global_config.global_config.get_config_value(
137 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
138
139 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800140 logging.error("Scheduler not enabled, set enable_scheduler to true in "
141 "the global_config's SCHEDULER section to enable it. "
142 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000143 sys.exit(1)
144
jadmanski0afbb632008-06-06 21:10:57 +0000145 global RESULTS_DIR
146 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000147
mbligh83c1e9e2009-05-01 23:10:41 +0000148 site_init = utils.import_site_function(__file__,
149 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
150 _site_init_monitor_db_dummy)
151 site_init()
152
showardcca334f2009-03-12 20:38:34 +0000153 # Change the cwd while running to avoid issues incase we were launched from
154 # somewhere odd (such as a random NFS home directory of the person running
155 # sudo to launch us as the appropriate user).
156 os.chdir(RESULTS_DIR)
157
jamesrenc7d387e2010-08-10 21:48:30 +0000158 # This is helpful for debugging why stuff a scheduler launches is
159 # misbehaving.
160 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000161
jadmanski0afbb632008-06-06 21:10:57 +0000162 if options.test:
163 global _autoserv_path
164 _autoserv_path = 'autoserv_dummy'
165 global _testing_mode
166 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000167
jamesrenc44ae992010-02-19 00:12:54 +0000168 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000169 server.start()
170
Dan Shicf2e8dd2015-05-07 17:18:48 -0700171 # Start the thread to report metadata.
172 metadata_reporter.start()
173
Paul Hobbseedcb8b2016-10-05 16:44:27 -0700174 with ts_mon_config.SetupTsMonGlobalState('autotest_scheduler',
175 indirect=True):
176 try:
177 initialize()
178 dispatcher = Dispatcher()
179 dispatcher.initialize(recover_hosts=options.recover_hosts)
180 minimum_tick_sec = global_config.global_config.get_config_value(
181 scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
Richard Barnetteffed1722016-05-18 15:57:22 -0700182
Paul Hobbseedcb8b2016-10-05 16:44:27 -0700183 while not _shutdown and not server._shutdown_scheduler:
184 start = time.time()
185 dispatcher.tick()
186 curr_tick_sec = time.time() - start
187 if minimum_tick_sec > curr_tick_sec:
188 time.sleep(minimum_tick_sec - curr_tick_sec)
189 else:
190 time.sleep(0.0001)
191 except server_manager_utils.ServerActionError as e:
192 # This error is expected when the server is not in primary status
193 # for scheduler role. Thus do not send email for it.
194 logging.exception(e)
195 except Exception:
196 email_manager.manager.log_stacktrace(
197 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000198
Paul Hobbsabd3b052016-10-03 18:25:23 +0000199 metadata_reporter.abort()
200 email_manager.manager.send_queued_emails()
201 server.shutdown()
202 _drone_manager.shutdown()
203 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000204
205
Prashanth B4ec98672014-05-15 10:44:54 -0700206def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000207 global _shutdown
208 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000209 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000210
211
jamesrenc44ae992010-02-19 00:12:54 +0000212def initialize():
showardb18134f2009-03-20 20:52:18 +0000213 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
214 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000215
showard8de37132009-08-31 18:33:08 +0000216 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000217 logging.critical("monitor_db already running, aborting!")
218 sys.exit(1)
219 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000220
showardb1e51872008-10-07 11:08:18 +0000221 if _testing_mode:
222 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700223 scheduler_lib.DB_CONFIG_SECTION, 'database',
224 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000225
Dan Shib9144a42014-12-01 16:09:32 -0800226 # If server database is enabled, check if the server has role `scheduler`.
227 # If the server does not have scheduler role, exception will be raised and
228 # scheduler will not continue to run.
229 if server_manager_utils.use_server_db():
230 server_manager_utils.confirm_server_has_role(hostname='localhost',
231 role='scheduler')
232
jadmanski0afbb632008-06-06 21:10:57 +0000233 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700234 global _db_manager
235 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700236 global _db
237 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000238 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700239 signal.signal(signal.SIGINT, handle_signal)
240 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000241
jamesrenc44ae992010-02-19 00:12:54 +0000242 initialize_globals()
243 scheduler_models.initialize()
244
Dan Shi114e1722016-01-10 18:12:53 -0800245 drone_list = system_utils.get_drones()
showard170873e2009-01-07 00:22:26 +0000246 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000247 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000248 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
249
showardb18134f2009-03-20 20:52:18 +0000250 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000251
252
jamesrenc44ae992010-02-19 00:12:54 +0000253def initialize_globals():
254 global _drone_manager
255 _drone_manager = drone_manager.instance()
256
257
showarded2afea2009-07-07 20:54:07 +0000258def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
259 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000260 """
261 @returns The autoserv command line as a list of executable + parameters.
262
263 @param machines - string - A machine or comma separated list of machines
264 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000265 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700266 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
267 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000268 @param queue_entry - A HostQueueEntry object - If supplied and no Job
269 object was supplied, this will be used to lookup the Job object.
270 """
Simran Basi1bf60eb2015-12-01 16:39:29 -0800271 command = autoserv_utils.autoserv_run_job_command(_autoserv_directory,
Aviv Keshet308e7362013-05-21 14:43:16 -0700272 machines, results_directory=drone_manager.WORKING_DIRECTORY,
273 extra_args=extra_args, job=job, queue_entry=queue_entry,
Simran Basi1bf60eb2015-12-01 16:39:29 -0800274 verbose=verbose, in_lab=True)
275 return command
showard87ba02a2009-04-20 19:37:32 +0000276
277
Simran Basia858a232012-08-21 11:04:37 -0700278class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800279
280
jadmanski0afbb632008-06-06 21:10:57 +0000281 def __init__(self):
282 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000283 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700284 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000285 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700286 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700287 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Dan Shic458f662015-04-29 12:12:38 -0700288 _db, _drone_manager)
showard170873e2009-01-07 00:22:26 +0000289 self._host_agents = {}
290 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000291 self._tick_count = 0
292 self._last_garbage_stats_time = time.time()
293 self._seconds_between_garbage_stats = 60 * (
294 global_config.global_config.get_config_value(
295 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700296 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700297 self._tick_debug = global_config.global_config.get_config_value(
298 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
299 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700300 self._extra_debugging = global_config.global_config.get_config_value(
301 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
302 default=False)
mbligh36768f02008-02-22 18:28:33 +0000303
Prashanth Bf66d51b2014-05-06 12:42:25 -0700304 # If _inline_host_acquisition is set the scheduler will acquire and
305 # release hosts against jobs inline, with the tick. Otherwise the
306 # scheduler will only focus on jobs that already have hosts, and
307 # will not explicitly unlease a host when a job finishes using it.
308 self._job_query_manager = query_managers.AFEJobQueryManager()
309 self._host_scheduler = (host_scheduler.BaseHostScheduler()
310 if _inline_host_acquisition else
311 host_scheduler.DummyHostScheduler())
312
mbligh36768f02008-02-22 18:28:33 +0000313
showard915958d2009-04-22 21:00:58 +0000314 def initialize(self, recover_hosts=True):
315 self._periodic_cleanup.initialize()
316 self._24hr_upkeep.initialize()
Dan Shi55d58992015-05-05 09:10:02 -0700317 # Execute all actions queued in the cleanup tasks. Scheduler tick will
318 # run a refresh task first. If there is any action in the queue, refresh
319 # will raise an exception.
320 _drone_manager.execute_actions()
showard915958d2009-04-22 21:00:58 +0000321
jadmanski0afbb632008-06-06 21:10:57 +0000322 # always recover processes
323 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000324
jadmanski0afbb632008-06-06 21:10:57 +0000325 if recover_hosts:
326 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000327
328
Simran Basi0ec94dd2012-08-28 09:50:10 -0700329 def _log_tick_msg(self, msg):
330 if self._tick_debug:
331 logging.debug(msg)
332
333
Simran Basidef92872012-09-20 13:34:34 -0700334 def _log_extra_msg(self, msg):
335 if self._extra_debugging:
336 logging.debug(msg)
337
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -0800338 @metrics.SecondsTimerDecorator(
339 'chromeos/autotest/scheduler/tick_durations/tick')
jadmanski0afbb632008-06-06 21:10:57 +0000340 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700341 """
342 This is an altered version of tick() where we keep track of when each
343 major step begins so we can try to figure out where we are using most
344 of the tick time.
345 """
Dan Shi114e1722016-01-10 18:12:53 -0800346 system_utils.DroneCache.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700347 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000348 self._garbage_collection()
Prashanth B340fd1e2014-06-22 12:44:10 -0700349 self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
350 _drone_manager.trigger_refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700351 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000352 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700353 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000354 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700355 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000356 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700357 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000358 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700359 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000360 self._schedule_new_jobs()
Prashanth B340fd1e2014-06-22 12:44:10 -0700361 self._log_tick_msg('Calling _drone_manager.sync_refresh().')
362 _drone_manager.sync_refresh()
Dan Shi55d58992015-05-05 09:10:02 -0700363 # _run_cleanup must be called between drone_manager.sync_refresh, and
364 # drone_manager.execute_actions, as sync_refresh will clear the calls
365 # queued in drones. Therefore, any action that calls drone.queue_call
366 # to add calls to the drone._calls, should be after drone refresh is
367 # completed and before drone_manager.execute_actions at the end of the
368 # tick.
369 self._log_tick_msg('Calling _run_cleanup().')
370 self._run_cleanup()
Prashanth B67548092014-07-11 18:46:01 -0700371 self._log_tick_msg('Calling _find_aborting().')
372 self._find_aborting()
373 self._log_tick_msg('Calling _find_aborted_special_tasks().')
374 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700375 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000376 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700377 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000378 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700379 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000380 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700381 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700382 'email_manager.manager.send_queued_emails().')
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800383 # TODO(pprabhu) crbug.com/667171: Add back metric for % time spent in
384 # this sub-step.
385 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700386 self._log_tick_msg('Calling django.db.reset_queries().')
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800387 # TODO(pprabhu) crbug.com/667171: Add back metric for % time spent in
388 # this sub-step.
389 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000390 self._tick_count += 1
Aviv Keshet65fed072016-06-29 10:20:55 -0700391 metrics.Counter('chromeos/autotest/scheduler/tick').increment()
mbligh36768f02008-02-22 18:28:33 +0000392
showard97aed502008-11-04 02:01:24 +0000393
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -0800394 # TODO(pprabhu) crbug.com/667171: Add back metric for % time spent in this
395 # sub-step.
mblighf3294cc2009-04-08 21:17:38 +0000396 def _run_cleanup(self):
397 self._periodic_cleanup.run_cleanup_maybe()
398 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000399
mbligh36768f02008-02-22 18:28:33 +0000400
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -0800401 # TODO(pprabhu) crbug.com/667171: Add back metric for % time spent in this
402 # sub-step.
showardf13a9e22009-12-18 22:54:09 +0000403 def _garbage_collection(self):
404 threshold_time = time.time() - self._seconds_between_garbage_stats
405 if threshold_time < self._last_garbage_stats_time:
406 # Don't generate these reports very often.
407 return
408
409 self._last_garbage_stats_time = time.time()
410 # Force a full level 0 collection (because we can, it doesn't hurt
411 # at this interval).
412 gc.collect()
413 logging.info('Logging garbage collector stats on tick %d.',
414 self._tick_count)
415 gc_stats._log_garbage_collector_stats()
416
417
showard170873e2009-01-07 00:22:26 +0000418 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
419 for object_id in object_ids:
420 agent_dict.setdefault(object_id, set()).add(agent)
421
422
423 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
424 for object_id in object_ids:
425 assert object_id in agent_dict
426 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700427 # If an ID has no more active agent associated, there is no need to
428 # keep it in the dictionary. Otherwise, scheduler will keep an
429 # unnecessarily big dictionary until being restarted.
430 if not agent_dict[object_id]:
431 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000432
433
showardd1195652009-12-08 22:21:02 +0000434 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700435 """
436 Creates and adds an agent to the dispatchers list.
437
438 In creating the agent we also pass on all the queue_entry_ids and
439 host_ids from the special agent task. For every agent we create, we
440 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
441 against the host_ids given to it. So theoritically, a host can have any
442 number of agents associated with it, and each of them can have any
443 special agent task, though in practice we never see > 1 agent/task per
444 host at any time.
445
446 @param agent_task: A SpecialTask for the agent to manage.
447 """
showardd1195652009-12-08 22:21:02 +0000448 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000449 self._agents.append(agent)
450 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000451 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
452 self._register_agent_for_ids(self._queue_entry_agents,
453 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000454
showard170873e2009-01-07 00:22:26 +0000455
456 def get_agents_for_entry(self, queue_entry):
457 """
458 Find agents corresponding to the specified queue_entry.
459 """
showardd3dc1992009-04-22 21:01:40 +0000460 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000461
462
463 def host_has_agent(self, host):
464 """
465 Determine if there is currently an Agent present using this host.
466 """
467 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000468
469
jadmanski0afbb632008-06-06 21:10:57 +0000470 def remove_agent(self, agent):
471 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000472 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
473 agent)
474 self._unregister_agent_for_ids(self._queue_entry_agents,
475 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000476
477
showard8cc058f2009-09-08 16:26:33 +0000478 def _host_has_scheduled_special_task(self, host):
479 return bool(models.SpecialTask.objects.filter(host__id=host.id,
480 is_active=False,
481 is_complete=False))
482
483
jadmanski0afbb632008-06-06 21:10:57 +0000484 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000485 agent_tasks = self._create_recovery_agent_tasks()
486 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000487 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000488 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000489 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000490 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000491 self._reverify_remaining_hosts()
492 # reinitialize drones after killing orphaned processes, since they can
493 # leave around files when they die
494 _drone_manager.execute_actions()
495 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000496
showard170873e2009-01-07 00:22:26 +0000497
showardd1195652009-12-08 22:21:02 +0000498 def _create_recovery_agent_tasks(self):
499 return (self._get_queue_entry_agent_tasks()
500 + self._get_special_task_agent_tasks(is_active=True))
501
502
503 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700504 """
505 Get agent tasks for all hqe in the specified states.
506
507 Loosely this translates to taking a hqe in one of the specified states,
508 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
509 through _get_agent_task_for_queue_entry. Each queue entry can only have
510 one agent task at a time, but there might be multiple queue entries in
511 the group.
512
513 @return: A list of AgentTasks.
514 """
showardd1195652009-12-08 22:21:02 +0000515 # host queue entry statuses handled directly by AgentTasks (Verifying is
516 # handled through SpecialTasks, so is not listed here)
517 statuses = (models.HostQueueEntry.Status.STARTING,
518 models.HostQueueEntry.Status.RUNNING,
519 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000520 models.HostQueueEntry.Status.PARSING,
521 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000522 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000523 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000524 where='status IN (%s)' % status_list)
525
526 agent_tasks = []
527 used_queue_entries = set()
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800528 hqe_count_by_status = {}
showardd1195652009-12-08 22:21:02 +0000529 for entry in queue_entries:
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800530 hqe_count_by_status[entry.status] = (
531 hqe_count_by_status.get(entry.status, 0) + 1)
showardd1195652009-12-08 22:21:02 +0000532 if self.get_agents_for_entry(entry):
533 # already being handled
534 continue
535 if entry in used_queue_entries:
536 # already picked up by a synchronous job
537 continue
538 agent_task = self._get_agent_task_for_queue_entry(entry)
539 agent_tasks.append(agent_task)
540 used_queue_entries.update(agent_task.queue_entries)
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800541
542 for status, count in hqe_count_by_status.iteritems():
543 metrics.Gauge(
544 'chromeos/autotest/scheduler/active_host_queue_entries'
545 ).set(count, fields={'status': status})
546
showardd1195652009-12-08 22:21:02 +0000547 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000548
549
showardd1195652009-12-08 22:21:02 +0000550 def _get_special_task_agent_tasks(self, is_active=False):
551 special_tasks = models.SpecialTask.objects.filter(
552 is_active=is_active, is_complete=False)
553 return [self._get_agent_task_for_special_task(task)
554 for task in special_tasks]
555
556
557 def _get_agent_task_for_queue_entry(self, queue_entry):
558 """
beeps8bb1f7d2013-08-05 01:30:09 -0700559 Construct an AgentTask instance for the given active HostQueueEntry.
560
showardd1195652009-12-08 22:21:02 +0000561 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700562 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000563 """
564 task_entries = queue_entry.job.get_group_entries(queue_entry)
565 self._check_for_duplicate_host_entries(task_entries)
566
567 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
568 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000569 if queue_entry.is_hostless():
570 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000571 return QueueTask(queue_entries=task_entries)
572 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700573 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000574 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700575 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000576 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700577 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000578
Prashanth B0e960282014-05-13 19:38:28 -0700579 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800580 '_get_agent_task_for_queue_entry got entry with '
581 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000582
583
584 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000585 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
586 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000587 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000588 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000589 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000590 if using_host:
showardd1195652009-12-08 22:21:02 +0000591 self._assert_host_has_no_agent(task_entry)
592
593
594 def _assert_host_has_no_agent(self, entry):
595 """
596 @param entry: a HostQueueEntry or a SpecialTask
597 """
598 if self.host_has_agent(entry.host):
599 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700600 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000601 'While scheduling %s, host %s already has a host agent %s'
602 % (entry, entry.host, agent.task))
603
604
605 def _get_agent_task_for_special_task(self, special_task):
606 """
607 Construct an AgentTask class to run the given SpecialTask and add it
608 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700609
MK Ryu35d661e2014-09-25 17:44:10 -0700610 A special task is created through schedule_special_tasks, but only if
beeps8bb1f7d2013-08-05 01:30:09 -0700611 the host doesn't already have an agent. This happens through
612 add_agent_task. All special agent tasks are given a host on creation,
613 and a Null hqe. To create a SpecialAgentTask object, you need a
614 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
615 object contains a hqe it's passed on to the special agent task, which
616 creates a HostQueueEntry and saves it as it's queue_entry.
617
showardd1195652009-12-08 22:21:02 +0000618 @param special_task: a models.SpecialTask instance
619 @returns an AgentTask to run this SpecialTask
620 """
621 self._assert_host_has_no_agent(special_task)
622
beeps5e2bb4a2013-10-28 11:26:45 -0700623 special_agent_task_classes = (prejob_task.CleanupTask,
624 prejob_task.VerifyTask,
625 prejob_task.RepairTask,
626 prejob_task.ResetTask,
627 prejob_task.ProvisionTask)
628
showardd1195652009-12-08 22:21:02 +0000629 for agent_task_class in special_agent_task_classes:
630 if agent_task_class.TASK_TYPE == special_task.task:
631 return agent_task_class(task=special_task)
632
Prashanth B0e960282014-05-13 19:38:28 -0700633 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800634 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000635
636
637 def _register_pidfiles(self, agent_tasks):
638 for agent_task in agent_tasks:
639 agent_task.register_necessary_pidfiles()
640
641
642 def _recover_tasks(self, agent_tasks):
643 orphans = _drone_manager.get_orphaned_autoserv_processes()
644
645 for agent_task in agent_tasks:
646 agent_task.recover()
647 if agent_task.monitor and agent_task.monitor.has_process():
648 orphans.discard(agent_task.monitor.get_process())
649 self.add_agent_task(agent_task)
650
651 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000652
653
showard8cc058f2009-09-08 16:26:33 +0000654 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000655 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
656 % status):
showard0db3d432009-10-12 20:29:15 +0000657 if entry.status == status and not self.get_agents_for_entry(entry):
658 # The status can change during iteration, e.g., if job.run()
659 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000660 yield entry
661
662
showard6878e8b2009-07-20 22:37:45 +0000663 def _check_for_remaining_orphan_processes(self, orphans):
664 if not orphans:
665 return
666 subject = 'Unrecovered orphan autoserv processes remain'
667 message = '\n'.join(str(process) for process in orphans)
668 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000669
670 die_on_orphans = global_config.global_config.get_config_value(
671 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
672
673 if die_on_orphans:
674 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000675
showard170873e2009-01-07 00:22:26 +0000676
showard8cc058f2009-09-08 16:26:33 +0000677 def _recover_pending_entries(self):
678 for entry in self._get_unassigned_entries(
679 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000680 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000681 entry.on_pending()
682
683
showardb8900452009-10-12 20:31:01 +0000684 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000685 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000686 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
687 unrecovered_hqes = []
688 for queue_entry in queue_entries:
689 special_tasks = models.SpecialTask.objects.filter(
690 task__in=(models.SpecialTask.Task.CLEANUP,
691 models.SpecialTask.Task.VERIFY),
692 queue_entry__id=queue_entry.id,
693 is_complete=False)
694 if special_tasks.count() == 0:
695 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000696
showardb8900452009-10-12 20:31:01 +0000697 if unrecovered_hqes:
698 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700699 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000700 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000701 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000702
703
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -0800704 # TODO(pprabhu) crbug.com/667171: Add back metric for % time spent in this
705 # sub-step.
showard65db3932009-10-28 19:54:35 +0000706 def _schedule_special_tasks(self):
707 """
708 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700709
710 Special tasks include PreJobTasks like verify, reset and cleanup.
711 They are created through _schedule_new_jobs and associated with a hqe
712 This method translates SpecialTasks to the appropriate AgentTask and
713 adds them to the dispatchers agents list, so _handle_agents can execute
714 them.
showard65db3932009-10-28 19:54:35 +0000715 """
Prashanth B4ec98672014-05-15 10:44:54 -0700716 # When the host scheduler is responsible for acquisition we only want
717 # to run tasks with leased hosts. All hqe tasks will already have
718 # leased hosts, and we don't want to run frontend tasks till the host
719 # scheduler has vetted the assignment. Note that this doesn't include
720 # frontend tasks with hosts leased by other active hqes.
721 for task in self._job_query_manager.get_prioritized_special_tasks(
722 only_tasks_with_leased_hosts=not _inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000723 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000724 continue
showardd1195652009-12-08 22:21:02 +0000725 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000726
727
showard170873e2009-01-07 00:22:26 +0000728 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000729 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000730 # should never happen
showarded2afea2009-07-07 20:54:07 +0000731 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000732 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000733 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700734 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000735 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000736
737
jadmanski0afbb632008-06-06 21:10:57 +0000738 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000739 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700740 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000741 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000742 if self.host_has_agent(host):
743 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000744 continue
showard8cc058f2009-09-08 16:26:33 +0000745 if self._host_has_scheduled_special_task(host):
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700746 # host will have a special task scheduled on the next tick
showard8cc058f2009-09-08 16:26:33 +0000747 continue
showard170873e2009-01-07 00:22:26 +0000748 if print_message:
showardb18134f2009-03-20 20:52:18 +0000749 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000750 models.SpecialTask.objects.create(
751 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000752 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000753
754
jadmanski0afbb632008-06-06 21:10:57 +0000755 def _recover_hosts(self):
756 # recover "Repair Failed" hosts
757 message = 'Reverifying dead host %s'
758 self._reverify_hosts_where("status = 'Repair Failed'",
759 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000760
761
showard89f84db2009-03-12 20:39:13 +0000762 def _refresh_pending_queue_entries(self):
763 """
764 Lookup the pending HostQueueEntries and call our HostScheduler
765 refresh() method given that list. Return the list.
766
767 @returns A list of pending HostQueueEntries sorted in priority order.
768 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700769 queue_entries = self._job_query_manager.get_pending_queue_entries(
770 only_hostless=not _inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000771 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000772 return []
showard89f84db2009-03-12 20:39:13 +0000773 return queue_entries
774
775
showarda9545c02009-12-18 22:44:26 +0000776 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800777 """Schedule a hostless (suite) job.
778
779 @param queue_entry: The queue_entry representing the hostless job.
780 """
showarda9545c02009-12-18 22:44:26 +0000781 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700782
783 # Need to set execution_subdir before setting the status:
784 # After a restart of the scheduler, agents will be restored for HQEs in
785 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
786 # execution_subdir is needed. Therefore it must be set before entering
787 # one of these states.
788 # Otherwise, if the scheduler was interrupted between setting the status
789 # and the execution_subdir, upon it's restart restoring agents would
790 # fail.
791 # Is there a way to get a status in one of these states without going
792 # through this code? Following cases are possible:
793 # - If it's aborted before being started:
794 # active bit will be 0, so there's nothing to parse, it will just be
795 # set to completed by _find_aborting. Critical statuses are skipped.
796 # - If it's aborted or it fails after being started:
797 # It was started, so this code was executed.
798 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000799 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000800
801
beepscc9fc702013-12-02 12:45:38 -0800802 def _schedule_host_job(self, host, queue_entry):
803 """Schedules a job on the given host.
804
805 1. Assign the host to the hqe, if it isn't already assigned.
806 2. Create a SpecialAgentTask for the hqe.
807 3. Activate the hqe.
808
809 @param queue_entry: The job to schedule.
810 @param host: The host to schedule the job on.
811 """
812 if self.host_has_agent(host):
813 host_agent_task = list(self._host_agents.get(host.id))[0].task
814 subject = 'Host with agents assigned to an HQE'
815 message = ('HQE: %s assigned host %s, but the host has '
816 'agent: %s for queue_entry %s. The HQE '
beepsf7d35162014-02-13 16:42:13 -0800817 'will have to try and acquire a host next tick ' %
beepscc9fc702013-12-02 12:45:38 -0800818 (queue_entry, host.hostname, host_agent_task,
819 host_agent_task.queue_entry))
820 email_manager.manager.enqueue_notify_email(subject, message)
beepscc9fc702013-12-02 12:45:38 -0800821 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700822 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800823
824
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -0800825 # TODO(pprabhu) crbug.com/667171: Add back metric for % time spent in this
826 # sub-step.
showard89f84db2009-03-12 20:39:13 +0000827 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700828 """
829 Find any new HQEs and call schedule_pre_job_tasks for it.
830
831 This involves setting the status of the HQE and creating a row in the
832 db corresponding the the special task, through
833 scheduler_models._queue_special_task. The new db row is then added as
834 an agent to the dispatcher through _schedule_special_tasks and
835 scheduled for execution on the drone through _handle_agents.
836 """
showard89f84db2009-03-12 20:39:13 +0000837 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000838
beepscc9fc702013-12-02 12:45:38 -0800839 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700840 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700841 new_jobs_with_hosts = 0
842 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800843 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700844 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000845
beepscc9fc702013-12-02 12:45:38 -0800846 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000847 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000848 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700849 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000850 else:
beepscc9fc702013-12-02 12:45:38 -0800851 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700852 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700853
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800854 metrics.Counter(
855 'chromeos/autotest/scheduler/scheduled_jobs_hostless'
856 ).increment_by(new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800857 if not host_jobs:
858 return
Prashanth Bf66d51b2014-05-06 12:42:25 -0700859 if not _inline_host_acquisition:
860 message = ('Found %s jobs that need hosts though '
861 '_inline_host_acquisition=%s. Will acquire hosts.' %
862 ([str(job) for job in host_jobs],
863 _inline_host_acquisition))
864 email_manager.manager.enqueue_notify_email(
865 'Processing unexpected host acquisition requests', message)
866 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
867 for host_assignment in jobs_with_hosts:
868 self._schedule_host_job(host_assignment.host, host_assignment.job)
869 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800870
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800871 metrics.Counter(
872 'chromeos/autotest/scheduler/scheduled_jobs_with_hosts'
873 ).increment_by(new_jobs_with_hosts)
874 # TODO(pprabhu): Decide what to do about this metric. Million dollar
875 # question: What happens to jobs that were not matched. Do they stay in
876 # the queue, and get processed right here in the next tick (then we want
877 # a guage corresponding to the number of outstanding unmatched host
878 # jobs), or are they handled somewhere else (then we need a counter
879 # corresponding to failed_to_match_with_hosts jobs).
880 #autotest_stats.Gauge(key).send('new_jobs_without_hosts',
881 # new_jobs_need_hosts -
882 # new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000883
884
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -0800885 # TODO(pprabhu) crbug.com/667171: Add back metric for % time spent in this
886 # sub-step.
showard8cc058f2009-09-08 16:26:33 +0000887 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700888 """
889 Adds agents to the dispatcher.
890
891 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
892 QueueTask for example, will have a job with a control file, and
893 the agent will have methods that poll, abort and check if the queue
894 task is finished. The dispatcher runs the agent_task, as well as
895 other agents in it's _agents member, through _handle_agents, by
896 calling the Agents tick().
897
898 This method creates an agent for each HQE in one of (starting, running,
899 gathering, parsing, archiving) states, and adds it to the dispatcher so
900 it is handled by _handle_agents.
901 """
showardd1195652009-12-08 22:21:02 +0000902 for agent_task in self._get_queue_entry_agent_tasks():
903 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000904
905
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -0800906 # TODO(pprabhu) crbug.com/667171: Add back metric for % time spent in this
907 # sub-step.
showard8cc058f2009-09-08 16:26:33 +0000908 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000909 for entry in scheduler_models.HostQueueEntry.fetch(
910 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000911 task = entry.job.schedule_delayed_callback_task(entry)
912 if task:
showardd1195652009-12-08 22:21:02 +0000913 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000914
915
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -0800916 # TODO(pprabhu) crbug.com/667171: Add back metric for % time spent in this
917 # sub-step.
jadmanski0afbb632008-06-06 21:10:57 +0000918 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700919 """
920 Looks through the afe_host_queue_entries for an aborted entry.
921
922 The aborted bit is set on an HQE in many ways, the most common
923 being when a user requests an abort through the frontend, which
924 results in an rpc from the afe to abort_host_queue_entries.
925 """
jamesrene7c65cb2010-06-08 20:38:10 +0000926 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000927 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700928 where='aborted=1 and complete=0'):
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800929
930 # If the job is running on a shard, let the shard handle aborting
931 # it and sync back the right status.
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800932 if entry.job.shard_id is not None and not server_utils.is_shard():
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800933 logging.info('Waiting for shard %s to abort hqe %s',
934 entry.job.shard_id, entry)
935 continue
936
showardf4a2e502009-07-28 20:06:39 +0000937 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800938
939 # The task would have started off with both is_complete and
940 # is_active = False. Aborted tasks are neither active nor complete.
941 # For all currently active tasks this will happen through the agent,
942 # but we need to manually update the special tasks that haven't
943 # started yet, because they don't have agents.
944 models.SpecialTask.objects.filter(is_active=False,
945 queue_entry_id=entry.id).update(is_complete=True)
946
showardd3dc1992009-04-22 21:01:40 +0000947 for agent in self.get_agents_for_entry(entry):
948 agent.abort()
949 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000950 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700951 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000952 for job in jobs_to_stop:
953 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000954
955
beeps8bb1f7d2013-08-05 01:30:09 -0700956 def _find_aborted_special_tasks(self):
957 """
958 Find SpecialTasks that have been marked for abortion.
959
960 Poll the database looking for SpecialTasks that are active
961 and have been marked for abortion, then abort them.
962 """
963
964 # The completed and active bits are very important when it comes
965 # to scheduler correctness. The active bit is set through the prolog
966 # of a special task, and reset through the cleanup method of the
967 # SpecialAgentTask. The cleanup is called both through the abort and
968 # epilog. The complete bit is set in several places, and in general
969 # a hanging job will have is_active=1 is_complete=0, while a special
970 # task which completed will have is_active=0 is_complete=1. To check
971 # aborts we directly check active because the complete bit is set in
972 # several places, including the epilog of agent tasks.
973 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
974 is_aborted=True)
975 for task in aborted_tasks:
976 # There are 2 ways to get the agent associated with a task,
977 # through the host and through the hqe. A special task
978 # always needs a host, but doesn't always need a hqe.
979 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700980 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000981
beeps8bb1f7d2013-08-05 01:30:09 -0700982 # The epilog preforms critical actions such as
983 # queueing the next SpecialTask, requeuing the
984 # hqe etc, however it doesn't actually kill the
985 # monitor process and set the 'done' bit. Epilogs
986 # assume that the job failed, and that the monitor
987 # process has already written an exit code. The
988 # done bit is a necessary condition for
989 # _handle_agents to schedule any more special
990 # tasks against the host, and it must be set
991 # in addition to is_active, is_complete and success.
992 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000993 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700994
995
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700996 def _can_start_agent(self, agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000997 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000998 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000999 return True
1000 # don't allow any nonzero-process agents to run after we've reached a
1001 # limit (this avoids starvation of many-process agents)
1002 if have_reached_limit:
1003 return False
1004 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001005 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +00001006 agent.task.owner_username,
1007 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +00001008 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001009 return False
showard4c5374f2008-09-04 17:02:56 +00001010 return True
1011
1012
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -08001013 # TODO(pprabhu) crbug.com/667171: Add back metric for % time spent in this
1014 # sub-step.
jadmanski0afbb632008-06-06 21:10:57 +00001015 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -07001016 """
1017 Handles agents of the dispatcher.
1018
1019 Appropriate Agents are added to the dispatcher through
1020 _schedule_running_host_queue_entries. These agents each
1021 have a task. This method runs the agents task through
1022 agent.tick() leading to:
1023 agent.start
1024 prolog -> AgentTasks prolog
1025 For each queue entry:
1026 sets host status/status to Running
1027 set started_on in afe_host_queue_entries
1028 run -> AgentTasks run
1029 Creates PidfileRunMonitor
1030 Queues the autoserv command line for this AgentTask
1031 via the drone manager. These commands are executed
1032 through the drone managers execute actions.
1033 poll -> AgentTasks/BaseAgentTask poll
1034 checks the monitors exit_code.
1035 Executes epilog if task is finished.
1036 Executes AgentTasks _finish_task
1037 finish_task is usually responsible for setting the status
1038 of the HQE/host, and updating it's active and complete fileds.
1039
1040 agent.is_done
1041 Removed the agent from the dispatchers _agents queue.
1042 Is_done checks the finished bit on the agent, that is
1043 set based on the Agents task. During the agents poll
1044 we check to see if the monitor process has exited in
1045 it's finish method, and set the success member of the
1046 task based on this exit code.
1047 """
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001048 num_started_this_tick = 0
1049 num_finished_this_tick = 0
showard4c5374f2008-09-04 17:02:56 +00001050 have_reached_limit = False
1051 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -07001052 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +00001053 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001054 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1055 'queue_entry ids:%s' % (agent.host_ids,
1056 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001057 if not agent.started:
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001058 if not self._can_start_agent(agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001059 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001060 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001061 continue
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001062 num_started_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001063 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001064 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001065 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001066 if agent.is_done():
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001067 num_finished_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001068 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001069 self.remove_agent(agent)
Prathmesh Prabhued7ece92016-11-23 11:19:43 -08001070
1071 metrics.Counter(
1072 'chromeos/autotest/scheduler/agent_processes_started'
1073 ).increment_by(num_started_this_tick)
1074 metrics.Counter(
1075 'chromeos/autotest/scheduler/agent_processes_finished'
1076 ).increment_by(num_finished_this_tick)
1077 num_agent_processes = _drone_manager.total_running_processes()
1078 metrics.Gauge(
1079 'chromeos/autotest/scheduler/agent_processes'
1080 ).set(num_agent_processes)
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001081 logging.info('%d running processes. %d added this tick.',
Prathmesh Prabhued7ece92016-11-23 11:19:43 -08001082 num_agent_processes, num_started_this_tick)
mbligh36768f02008-02-22 18:28:33 +00001083
1084
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -08001085 # TODO(pprabhu) crbug.com/667171: Add back metric for % time spent in this
1086 # sub-step.
showard29f7cd22009-04-29 21:16:24 +00001087 def _process_recurring_runs(self):
1088 recurring_runs = models.RecurringRun.objects.filter(
1089 start_date__lte=datetime.datetime.now())
1090 for rrun in recurring_runs:
1091 # Create job from template
1092 job = rrun.job
1093 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001094 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001095
1096 host_objects = info['hosts']
1097 one_time_hosts = info['one_time_hosts']
1098 metahost_objects = info['meta_hosts']
1099 dependencies = info['dependencies']
1100 atomic_group = info['atomic_group']
1101
1102 for host in one_time_hosts or []:
1103 this_host = models.Host.create_one_time_host(host.hostname)
1104 host_objects.append(this_host)
1105
1106 try:
1107 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001108 options=options,
showard29f7cd22009-04-29 21:16:24 +00001109 host_objects=host_objects,
1110 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001111 atomic_group=atomic_group)
1112
1113 except Exception, ex:
1114 logging.exception(ex)
1115 #TODO send email
1116
1117 if rrun.loop_count == 1:
1118 rrun.delete()
1119 else:
1120 if rrun.loop_count != 0: # if not infinite loop
1121 # calculate new start_date
1122 difference = datetime.timedelta(seconds=rrun.loop_period)
1123 rrun.start_date = rrun.start_date + difference
1124 rrun.loop_count -= 1
1125 rrun.save()
1126
1127
Simran Basia858a232012-08-21 11:04:37 -07001128SiteDispatcher = utils.import_site_class(
1129 __file__, 'autotest_lib.scheduler.site_monitor_db',
1130 'SiteDispatcher', BaseDispatcher)
1131
1132class Dispatcher(SiteDispatcher):
1133 pass
1134
1135
mbligh36768f02008-02-22 18:28:33 +00001136class Agent(object):
showard77182562009-06-10 00:16:05 +00001137 """
Alex Miller47715eb2013-07-24 03:34:01 -07001138 An agent for use by the Dispatcher class to perform a task. An agent wraps
1139 around an AgentTask mainly to associate the AgentTask with the queue_entry
1140 and host ids.
showard77182562009-06-10 00:16:05 +00001141
1142 The following methods are required on all task objects:
1143 poll() - Called periodically to let the task check its status and
1144 update its internal state. If the task succeeded.
1145 is_done() - Returns True if the task is finished.
1146 abort() - Called when an abort has been requested. The task must
1147 set its aborted attribute to True if it actually aborted.
1148
1149 The following attributes are required on all task objects:
1150 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001151 success - bool, True if this task succeeded.
1152 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1153 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001154 """
1155
1156
showard418785b2009-11-23 20:19:59 +00001157 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001158 """
Alex Miller47715eb2013-07-24 03:34:01 -07001159 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001160 """
showard8cc058f2009-09-08 16:26:33 +00001161 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001162
showard77182562009-06-10 00:16:05 +00001163 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001164 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001165
showard8cc058f2009-09-08 16:26:33 +00001166 self.queue_entry_ids = task.queue_entry_ids
1167 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001168
showard8cc058f2009-09-08 16:26:33 +00001169 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001170 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001171
1172
jadmanski0afbb632008-06-06 21:10:57 +00001173 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001174 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001175 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001176 self.task.poll()
1177 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001178 self.finished = True
showardec113162008-05-08 00:52:49 +00001179
1180
jadmanski0afbb632008-06-06 21:10:57 +00001181 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001182 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001183
1184
showardd3dc1992009-04-22 21:01:40 +00001185 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001186 if self.task:
1187 self.task.abort()
1188 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001189 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001190 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001191
showardd3dc1992009-04-22 21:01:40 +00001192
beeps5e2bb4a2013-10-28 11:26:45 -07001193class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001194 """
1195 Common functionality for QueueTask and HostlessQueueTask
1196 """
1197 def __init__(self, queue_entries):
1198 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001199 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001200 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001201
1202
showard73ec0442009-02-07 02:05:20 +00001203 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001204 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001205
1206
jamesrenc44ae992010-02-19 00:12:54 +00001207 def _write_control_file(self, execution_path):
1208 control_path = _drone_manager.attach_file_to_execution(
1209 execution_path, self.job.control_file)
1210 return control_path
1211
1212
Aviv Keshet308e7362013-05-21 14:43:16 -07001213 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001214 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001215 execution_path = self.queue_entries[0].execution_path()
1216 control_path = self._write_control_file(execution_path)
1217 hostnames = ','.join(entry.host.hostname
1218 for entry in self.queue_entries
1219 if not entry.is_hostless())
1220
1221 execution_tag = self.queue_entries[0].execution_tag()
1222 params = _autoserv_command_line(
1223 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001224 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001225 _drone_manager.absolute_path(control_path)],
1226 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001227 if self.job.is_image_update_job():
1228 params += ['--image', self.job.update_image_path]
1229
jamesrenc44ae992010-02-19 00:12:54 +00001230 return params
showardd1195652009-12-08 22:21:02 +00001231
1232
1233 @property
1234 def num_processes(self):
1235 return len(self.queue_entries)
1236
1237
1238 @property
1239 def owner_username(self):
1240 return self.job.owner
1241
1242
1243 def _working_directory(self):
1244 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001245
1246
jadmanski0afbb632008-06-06 21:10:57 +00001247 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001248 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001249 keyval_dict = self.job.keyval_dict()
1250 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001251 group_name = self.queue_entries[0].get_group_name()
1252 if group_name:
1253 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001254 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001255 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001256 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001257 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001258
1259
showard35162b02009-03-03 02:17:30 +00001260 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001261 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001262 _drone_manager.write_lines_to_file(error_file_path,
1263 [_LOST_PROCESS_ERROR])
1264
1265
showardd3dc1992009-04-22 21:01:40 +00001266 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001267 if not self.monitor:
1268 return
1269
showardd9205182009-04-27 20:09:55 +00001270 self._write_job_finished()
1271
showard35162b02009-03-03 02:17:30 +00001272 if self.monitor.lost_process:
1273 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001274
jadmanskif7fa2cc2008-10-01 14:13:23 +00001275
showardcbd74612008-11-19 21:42:02 +00001276 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001277 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001278 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001279 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001280 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001281
1282
jadmanskif7fa2cc2008-10-01 14:13:23 +00001283 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001284 if not self.monitor or not self.monitor.has_process():
1285 return
1286
jadmanskif7fa2cc2008-10-01 14:13:23 +00001287 # build up sets of all the aborted_by and aborted_on values
1288 aborted_by, aborted_on = set(), set()
1289 for queue_entry in self.queue_entries:
1290 if queue_entry.aborted_by:
1291 aborted_by.add(queue_entry.aborted_by)
1292 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1293 aborted_on.add(t)
1294
1295 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001296 # TODO(showard): this conditional is now obsolete, we just need to leave
1297 # it in temporarily for backwards compatibility over upgrades. delete
1298 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001299 assert len(aborted_by) <= 1
1300 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001301 aborted_by_value = aborted_by.pop()
1302 aborted_on_value = max(aborted_on)
1303 else:
1304 aborted_by_value = 'autotest_system'
1305 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001306
showarda0382352009-02-11 23:36:43 +00001307 self._write_keyval_after_job("aborted_by", aborted_by_value)
1308 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001309
showardcbd74612008-11-19 21:42:02 +00001310 aborted_on_string = str(datetime.datetime.fromtimestamp(
1311 aborted_on_value))
1312 self._write_status_comment('Job aborted by %s on %s' %
1313 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001314
1315
jadmanski0afbb632008-06-06 21:10:57 +00001316 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001317 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001318 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001319 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001320
1321
jadmanski0afbb632008-06-06 21:10:57 +00001322 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001323 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001324 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001325
1326
1327class QueueTask(AbstractQueueTask):
1328 def __init__(self, queue_entries):
1329 super(QueueTask, self).__init__(queue_entries)
1330 self._set_ids(queue_entries=queue_entries)
1331
1332
1333 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001334 self._check_queue_entry_statuses(
1335 self.queue_entries,
1336 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1337 models.HostQueueEntry.Status.RUNNING),
1338 allowed_host_statuses=(models.Host.Status.PENDING,
1339 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001340
1341 super(QueueTask, self).prolog()
1342
1343 for queue_entry in self.queue_entries:
1344 self._write_host_keyvals(queue_entry.host)
1345 queue_entry.host.set_status(models.Host.Status.RUNNING)
1346 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001347
1348
1349 def _finish_task(self):
1350 super(QueueTask, self)._finish_task()
1351
1352 for queue_entry in self.queue_entries:
1353 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001354 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001355
1356
Alex Miller9f01d5d2013-08-08 02:26:01 -07001357 def _command_line(self):
Dan Shi36cfd832014-10-10 13:38:51 -07001358 invocation = super(QueueTask, self)._command_line()
1359 # Check if server-side packaging is needed.
1360 if (_enable_ssp_container and
1361 self.job.control_type == control_data.CONTROL_TYPE.SERVER and
1362 self.job.require_ssp != False):
Dan Shiec1d47d2015-02-13 11:38:13 -08001363 invocation += ['--require-ssp']
Dan Shi36cfd832014-10-10 13:38:51 -07001364 keyval_dict = self.job.keyval_dict()
1365 test_source_build = keyval_dict.get('test_source_build', None)
1366 if test_source_build:
1367 invocation += ['--test_source_build', test_source_build]
Dan Shi70647ca2015-07-16 22:52:35 -07001368 if self.job.parent_job_id:
1369 invocation += ['--parent_job_id', str(self.job.parent_job_id)]
Dan Shi36cfd832014-10-10 13:38:51 -07001370 return invocation + ['--verify_job_repo_url']
Alex Miller9f01d5d2013-08-08 02:26:01 -07001371
1372
Dan Shi1a189052013-10-28 14:41:35 -07001373class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001374 def __init__(self, queue_entry):
1375 super(HostlessQueueTask, self).__init__([queue_entry])
1376 self.queue_entry_ids = [queue_entry.id]
1377
1378
1379 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001380 super(HostlessQueueTask, self).prolog()
1381
1382
mbligh4608b002010-01-05 18:22:35 +00001383 def _finish_task(self):
1384 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001385
1386 # When a job is added to database, its initial status is always
1387 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1388 # status, check if any of them can be started. If scheduler hits some
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001389 # limit, e.g., max_hostless_jobs_per_drone, scheduler will
1390 # leave these jobs in Starting status. Otherwise, the jobs'
1391 # status will be changed to Running, and an autoserv process
1392 # will be started in drone for each of these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001393 # If the entry is still in status Starting, the process has not started
1394 # yet. Therefore, there is no need to parse and collect log. Without
1395 # this check, exception will be raised by scheduler as execution_subdir
1396 # for this queue entry does not have a value yet.
1397 hqe = self.queue_entries[0]
1398 if hqe.status != models.HostQueueEntry.Status.STARTING:
1399 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001400
1401
mbligh36768f02008-02-22 18:28:33 +00001402if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001403 main()