blob: 0aee2f6f50e6711eefe36a08d4db26816954eb43 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
Richard Barnetteffed1722016-05-18 15:57:22 -07002
3#pylint: disable=C0111
4
mbligh36768f02008-02-22 18:28:33 +00005"""
6Autotest scheduler
7"""
showard909c7a62008-07-15 21:52:38 +00008
Dan Shif6c65bd2014-08-29 16:15:07 -07009import datetime
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -080010import functools
Dan Shif6c65bd2014-08-29 16:15:07 -070011import gc
12import logging
13import optparse
14import os
15import signal
16import sys
17import time
showard402934a2009-12-21 22:20:47 +000018
Alex Miller05d7b4c2013-03-04 07:49:38 -080019import common
showard21baa452008-10-21 00:08:39 +000020from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000021
22import django.db
23
Dan Shiec1d47d2015-02-13 11:38:13 -080024from autotest_lib.client.common_lib import control_data
Prashanth B0e960282014-05-13 19:38:28 -070025from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070026from autotest_lib.client.common_lib import utils
xixuan4de4e742017-01-30 13:31:35 -080027from autotest_lib.frontend.afe import models
Fang Dengc330bee2014-10-21 18:10:55 -070028from autotest_lib.scheduler import agent_task, drone_manager
beeps5e2bb4a2013-10-28 11:26:45 -070029from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
30from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070031from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070032from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070033from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000034from autotest_lib.scheduler import scheduler_models
Aviv Keshet6b4e3c22017-05-04 20:14:02 -070035from autotest_lib.scheduler import scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070036from autotest_lib.server import autoserv_utils
Dan Shi114e1722016-01-10 18:12:53 -080037from autotest_lib.server import system_utils
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080038from autotest_lib.server import utils as server_utils
Dan Shicf2e8dd2015-05-07 17:18:48 -070039from autotest_lib.site_utils import metadata_reporter
Dan Shib9144a42014-12-01 16:09:32 -080040from autotest_lib.site_utils import server_manager_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080041
Dan Shi5e2efb72017-02-07 11:40:23 -080042try:
43 from chromite.lib import metrics
44 from chromite.lib import ts_mon_config
45except ImportError:
46 metrics = utils.metrics_mock
47 ts_mon_config = utils.metrics_mock
48
Dan Shicf2e8dd2015-05-07 17:18:48 -070049
showard549afad2009-08-20 23:33:36 +000050BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
51PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000052
mbligh36768f02008-02-22 18:28:33 +000053RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000054AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
55
56if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000057 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000058AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
59AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
60
61if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000062 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000063
showard35162b02009-03-03 02:17:30 +000064# error message to leave in results dir when an autoserv process disappears
65# mysteriously
66_LOST_PROCESS_ERROR = """\
67Autoserv failed abnormally during execution for this job, probably due to a
68system error on the Autotest server. Full results may not be available. Sorry.
69"""
70
Prashanth B0e960282014-05-13 19:38:28 -070071_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070072_db = None
mbligh36768f02008-02-22 18:28:33 +000073_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070074
75# These 2 globals are replaced for testing
76_autoserv_directory = autoserv_utils.autoserv_directory
77_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000078_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000079_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070080
mbligh36768f02008-02-22 18:28:33 +000081
mbligh83c1e9e2009-05-01 23:10:41 +000082def _site_init_monitor_db_dummy():
83 return {}
84
85
jamesren76fcf192010-04-21 20:39:50 +000086def _verify_default_drone_set_exists():
87 if (models.DroneSet.drone_sets_enabled() and
88 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070089 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080090 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000091
92
93def _sanity_check():
94 """Make sure the configs are consistent before starting the scheduler"""
95 _verify_default_drone_set_exists()
96
97
mbligh36768f02008-02-22 18:28:33 +000098def main():
showard27f33872009-04-07 18:20:53 +000099 try:
showard549afad2009-08-20 23:33:36 +0000100 try:
101 main_without_exception_handling()
102 except SystemExit:
103 raise
104 except:
105 logging.exception('Exception escaping in monitor_db')
106 raise
107 finally:
108 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000109
110
111def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700112 scheduler_lib.setup_logging(
113 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
114 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000115 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000116 parser = optparse.OptionParser(usage)
117 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
118 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000119 parser.add_option('--test', help='Indicate that scheduler is under ' +
120 'test and should use dummy autoserv and no parsing',
121 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700122 parser.add_option('--production',
123 help=('Indicate that scheduler is running in production '
124 'environment and it can use database that is not '
125 'hosted in localhost. If it is set to False, '
126 'scheduler will fail if database is not in '
127 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700128 action='store_true', default=False)
jadmanski0afbb632008-06-06 21:10:57 +0000129 (options, args) = parser.parse_args()
130 if len(args) != 1:
131 parser.print_usage()
132 return
mbligh36768f02008-02-22 18:28:33 +0000133
Dan Shif6c65bd2014-08-29 16:15:07 -0700134 scheduler_lib.check_production_settings(options)
135
showard5613c662009-06-08 23:30:33 +0000136 scheduler_enabled = global_config.global_config.get_config_value(
137 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
138
139 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800140 logging.error("Scheduler not enabled, set enable_scheduler to true in "
141 "the global_config's SCHEDULER section to enable it. "
142 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000143 sys.exit(1)
144
jadmanski0afbb632008-06-06 21:10:57 +0000145 global RESULTS_DIR
146 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000147
mbligh83c1e9e2009-05-01 23:10:41 +0000148 site_init = utils.import_site_function(__file__,
149 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
150 _site_init_monitor_db_dummy)
151 site_init()
152
showardcca334f2009-03-12 20:38:34 +0000153 # Change the cwd while running to avoid issues incase we were launched from
154 # somewhere odd (such as a random NFS home directory of the person running
155 # sudo to launch us as the appropriate user).
156 os.chdir(RESULTS_DIR)
157
jamesrenc7d387e2010-08-10 21:48:30 +0000158 # This is helpful for debugging why stuff a scheduler launches is
159 # misbehaving.
160 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000161
jadmanski0afbb632008-06-06 21:10:57 +0000162 if options.test:
163 global _autoserv_path
164 _autoserv_path = 'autoserv_dummy'
165 global _testing_mode
166 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000167
Dan Shicf2e8dd2015-05-07 17:18:48 -0700168 # Start the thread to report metadata.
169 metadata_reporter.start()
170
Paul Hobbseedcb8b2016-10-05 16:44:27 -0700171 with ts_mon_config.SetupTsMonGlobalState('autotest_scheduler',
172 indirect=True):
173 try:
174 initialize()
175 dispatcher = Dispatcher()
176 dispatcher.initialize(recover_hosts=options.recover_hosts)
177 minimum_tick_sec = global_config.global_config.get_config_value(
178 scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
Richard Barnetteffed1722016-05-18 15:57:22 -0700179
Aviv Keshet6b4e3c22017-05-04 20:14:02 -0700180 while not _shutdown:
Paul Hobbseedcb8b2016-10-05 16:44:27 -0700181 start = time.time()
182 dispatcher.tick()
183 curr_tick_sec = time.time() - start
184 if minimum_tick_sec > curr_tick_sec:
185 time.sleep(minimum_tick_sec - curr_tick_sec)
186 else:
187 time.sleep(0.0001)
188 except server_manager_utils.ServerActionError as e:
189 # This error is expected when the server is not in primary status
190 # for scheduler role. Thus do not send email for it.
191 logging.exception(e)
192 except Exception:
Aviv Keshetea8f8512017-05-05 10:04:39 -0700193 logging.exception('Uncaught exception, terminating monitor_db.')
Aviv Keshet7f73f662017-04-27 11:48:45 -0700194 metrics.Counter('chromeos/autotest/scheduler/uncaught_exception'
195 ).increment()
jadmanski0afbb632008-06-06 21:10:57 +0000196
Paul Hobbsabd3b052016-10-03 18:25:23 +0000197 metadata_reporter.abort()
198 email_manager.manager.send_queued_emails()
Paul Hobbsabd3b052016-10-03 18:25:23 +0000199 _drone_manager.shutdown()
200 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000201
202
Prashanth B4ec98672014-05-15 10:44:54 -0700203def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000204 global _shutdown
205 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000206 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000207
208
jamesrenc44ae992010-02-19 00:12:54 +0000209def initialize():
showardb18134f2009-03-20 20:52:18 +0000210 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
211 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000212
showard8de37132009-08-31 18:33:08 +0000213 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000214 logging.critical("monitor_db already running, aborting!")
215 sys.exit(1)
216 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000217
showardb1e51872008-10-07 11:08:18 +0000218 if _testing_mode:
219 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700220 scheduler_lib.DB_CONFIG_SECTION, 'database',
221 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000222
Dan Shib9144a42014-12-01 16:09:32 -0800223 # If server database is enabled, check if the server has role `scheduler`.
224 # If the server does not have scheduler role, exception will be raised and
225 # scheduler will not continue to run.
226 if server_manager_utils.use_server_db():
227 server_manager_utils.confirm_server_has_role(hostname='localhost',
228 role='scheduler')
229
jadmanski0afbb632008-06-06 21:10:57 +0000230 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700231 global _db_manager
232 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700233 global _db
234 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000235 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700236 signal.signal(signal.SIGINT, handle_signal)
237 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000238
jamesrenc44ae992010-02-19 00:12:54 +0000239 initialize_globals()
240 scheduler_models.initialize()
241
Dan Shi114e1722016-01-10 18:12:53 -0800242 drone_list = system_utils.get_drones()
showard170873e2009-01-07 00:22:26 +0000243 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000244 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000245 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
246
showardb18134f2009-03-20 20:52:18 +0000247 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000248
249
jamesrenc44ae992010-02-19 00:12:54 +0000250def initialize_globals():
251 global _drone_manager
252 _drone_manager = drone_manager.instance()
253
254
showarded2afea2009-07-07 20:54:07 +0000255def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
256 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000257 """
258 @returns The autoserv command line as a list of executable + parameters.
259
260 @param machines - string - A machine or comma separated list of machines
261 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000262 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700263 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
264 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000265 @param queue_entry - A HostQueueEntry object - If supplied and no Job
266 object was supplied, this will be used to lookup the Job object.
267 """
Simran Basi1bf60eb2015-12-01 16:39:29 -0800268 command = autoserv_utils.autoserv_run_job_command(_autoserv_directory,
Aviv Keshet308e7362013-05-21 14:43:16 -0700269 machines, results_directory=drone_manager.WORKING_DIRECTORY,
270 extra_args=extra_args, job=job, queue_entry=queue_entry,
Simran Basi1bf60eb2015-12-01 16:39:29 -0800271 verbose=verbose, in_lab=True)
272 return command
showard87ba02a2009-04-20 19:37:32 +0000273
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800274def _calls_log_tick_msg(func):
275 """Used to trace functions called by BaseDispatcher.tick."""
276 @functools.wraps(func)
277 def wrapper(self, *args, **kwargs):
278 self._log_tick_msg('Starting %s' % func.__name__)
279 return func(self, *args, **kwargs)
280
281 return wrapper
282
showard87ba02a2009-04-20 19:37:32 +0000283
Simran Basia858a232012-08-21 11:04:37 -0700284class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800285
286
jadmanski0afbb632008-06-06 21:10:57 +0000287 def __init__(self):
288 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000289 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700290 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000291 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700292 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700293 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Dan Shic458f662015-04-29 12:12:38 -0700294 _db, _drone_manager)
showard170873e2009-01-07 00:22:26 +0000295 self._host_agents = {}
296 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000297 self._tick_count = 0
298 self._last_garbage_stats_time = time.time()
299 self._seconds_between_garbage_stats = 60 * (
300 global_config.global_config.get_config_value(
301 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700302 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700303 self._tick_debug = global_config.global_config.get_config_value(
304 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
305 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700306 self._extra_debugging = global_config.global_config.get_config_value(
307 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
308 default=False)
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800309 self._inline_host_acquisition = (
310 global_config.global_config.get_config_value(
311 scheduler_config.CONFIG_SECTION,
312 'inline_host_acquisition', type=bool, default=True))
mbligh36768f02008-02-22 18:28:33 +0000313
Prashanth Bf66d51b2014-05-06 12:42:25 -0700314 # If _inline_host_acquisition is set the scheduler will acquire and
315 # release hosts against jobs inline, with the tick. Otherwise the
316 # scheduler will only focus on jobs that already have hosts, and
317 # will not explicitly unlease a host when a job finishes using it.
318 self._job_query_manager = query_managers.AFEJobQueryManager()
319 self._host_scheduler = (host_scheduler.BaseHostScheduler()
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800320 if self._inline_host_acquisition else
Prashanth Bf66d51b2014-05-06 12:42:25 -0700321 host_scheduler.DummyHostScheduler())
322
mbligh36768f02008-02-22 18:28:33 +0000323
showard915958d2009-04-22 21:00:58 +0000324 def initialize(self, recover_hosts=True):
325 self._periodic_cleanup.initialize()
326 self._24hr_upkeep.initialize()
Dan Shi55d58992015-05-05 09:10:02 -0700327 # Execute all actions queued in the cleanup tasks. Scheduler tick will
328 # run a refresh task first. If there is any action in the queue, refresh
329 # will raise an exception.
330 _drone_manager.execute_actions()
showard915958d2009-04-22 21:00:58 +0000331
jadmanski0afbb632008-06-06 21:10:57 +0000332 # always recover processes
333 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000334
jadmanski0afbb632008-06-06 21:10:57 +0000335 if recover_hosts:
336 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000337
338
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800339 # TODO(pprabhu) Drop this metric once tick_times has been verified.
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -0800340 @metrics.SecondsTimerDecorator(
341 'chromeos/autotest/scheduler/tick_durations/tick')
jadmanski0afbb632008-06-06 21:10:57 +0000342 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700343 """
344 This is an altered version of tick() where we keep track of when each
345 major step begins so we can try to figure out where we are using most
346 of the tick time.
347 """
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800348 with metrics.RuntimeBreakdownTimer(
349 'chromeos/autotest/scheduler/tick_times') as breakdown_timer:
350 self._log_tick_msg('New tick')
351 system_utils.DroneCache.refresh()
352
353 with breakdown_timer.Step('garbage_collection'):
354 self._garbage_collection()
355 with breakdown_timer.Step('trigger_refresh'):
356 self._log_tick_msg('Starting _drone_manager.trigger_refresh')
357 _drone_manager.trigger_refresh()
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800358 with breakdown_timer.Step('schedule_running_host_queue_entries'):
359 self._schedule_running_host_queue_entries()
360 with breakdown_timer.Step('schedule_special_tasks'):
361 self._schedule_special_tasks()
362 with breakdown_timer.Step('schedule_new_jobs'):
363 self._schedule_new_jobs()
364 with breakdown_timer.Step('sync_refresh'):
365 self._log_tick_msg('Starting _drone_manager.sync_refresh')
366 _drone_manager.sync_refresh()
367 # _run_cleanup must be called between drone_manager.sync_refresh,
368 # and drone_manager.execute_actions, as sync_refresh will clear the
369 # calls queued in drones. Therefore, any action that calls
370 # drone.queue_call to add calls to the drone._calls, should be after
371 # drone refresh is completed and before
372 # drone_manager.execute_actions at the end of the tick.
373 with breakdown_timer.Step('run_cleanup'):
374 self._run_cleanup()
375 with breakdown_timer.Step('find_aborting'):
376 self._find_aborting()
377 with breakdown_timer.Step('find_aborted_special_tasks'):
378 self._find_aborted_special_tasks()
379 with breakdown_timer.Step('handle_agents'):
380 self._handle_agents()
381 with breakdown_timer.Step('host_scheduler_tick'):
382 self._log_tick_msg('Starting _host_scheduler.tick')
383 self._host_scheduler.tick()
384 with breakdown_timer.Step('drones_execute_actions'):
385 self._log_tick_msg('Starting _drone_manager.execute_actions')
386 _drone_manager.execute_actions()
387 with breakdown_timer.Step('send_queued_emails'):
388 self._log_tick_msg(
389 'Starting email_manager.manager.send_queued_emails')
390 email_manager.manager.send_queued_emails()
391 with breakdown_timer.Step('db_reset_queries'):
392 self._log_tick_msg('Starting django.db.reset_queries')
393 django.db.reset_queries()
394
395 self._tick_count += 1
396 metrics.Counter('chromeos/autotest/scheduler/tick').increment()
mbligh36768f02008-02-22 18:28:33 +0000397
showard97aed502008-11-04 02:01:24 +0000398
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800399 @_calls_log_tick_msg
mblighf3294cc2009-04-08 21:17:38 +0000400 def _run_cleanup(self):
401 self._periodic_cleanup.run_cleanup_maybe()
402 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000403
mbligh36768f02008-02-22 18:28:33 +0000404
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800405 @_calls_log_tick_msg
showardf13a9e22009-12-18 22:54:09 +0000406 def _garbage_collection(self):
407 threshold_time = time.time() - self._seconds_between_garbage_stats
408 if threshold_time < self._last_garbage_stats_time:
409 # Don't generate these reports very often.
410 return
411
412 self._last_garbage_stats_time = time.time()
413 # Force a full level 0 collection (because we can, it doesn't hurt
414 # at this interval).
415 gc.collect()
416 logging.info('Logging garbage collector stats on tick %d.',
417 self._tick_count)
418 gc_stats._log_garbage_collector_stats()
419
420
showard170873e2009-01-07 00:22:26 +0000421 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
422 for object_id in object_ids:
423 agent_dict.setdefault(object_id, set()).add(agent)
424
425
426 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
427 for object_id in object_ids:
428 assert object_id in agent_dict
429 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700430 # If an ID has no more active agent associated, there is no need to
431 # keep it in the dictionary. Otherwise, scheduler will keep an
432 # unnecessarily big dictionary until being restarted.
433 if not agent_dict[object_id]:
434 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000435
436
showardd1195652009-12-08 22:21:02 +0000437 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700438 """
439 Creates and adds an agent to the dispatchers list.
440
441 In creating the agent we also pass on all the queue_entry_ids and
442 host_ids from the special agent task. For every agent we create, we
443 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
444 against the host_ids given to it. So theoritically, a host can have any
445 number of agents associated with it, and each of them can have any
446 special agent task, though in practice we never see > 1 agent/task per
447 host at any time.
448
449 @param agent_task: A SpecialTask for the agent to manage.
450 """
showardd1195652009-12-08 22:21:02 +0000451 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000452 self._agents.append(agent)
453 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000454 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
455 self._register_agent_for_ids(self._queue_entry_agents,
456 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000457
showard170873e2009-01-07 00:22:26 +0000458
459 def get_agents_for_entry(self, queue_entry):
460 """
461 Find agents corresponding to the specified queue_entry.
462 """
showardd3dc1992009-04-22 21:01:40 +0000463 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000464
465
466 def host_has_agent(self, host):
467 """
468 Determine if there is currently an Agent present using this host.
469 """
470 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000471
472
jadmanski0afbb632008-06-06 21:10:57 +0000473 def remove_agent(self, agent):
474 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000475 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
476 agent)
477 self._unregister_agent_for_ids(self._queue_entry_agents,
478 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000479
480
showard8cc058f2009-09-08 16:26:33 +0000481 def _host_has_scheduled_special_task(self, host):
482 return bool(models.SpecialTask.objects.filter(host__id=host.id,
483 is_active=False,
484 is_complete=False))
485
486
jadmanski0afbb632008-06-06 21:10:57 +0000487 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000488 agent_tasks = self._create_recovery_agent_tasks()
489 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000490 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000491 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000492 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000493 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000494 self._reverify_remaining_hosts()
495 # reinitialize drones after killing orphaned processes, since they can
496 # leave around files when they die
497 _drone_manager.execute_actions()
498 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000499
showard170873e2009-01-07 00:22:26 +0000500
showardd1195652009-12-08 22:21:02 +0000501 def _create_recovery_agent_tasks(self):
502 return (self._get_queue_entry_agent_tasks()
503 + self._get_special_task_agent_tasks(is_active=True))
504
505
506 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700507 """
508 Get agent tasks for all hqe in the specified states.
509
510 Loosely this translates to taking a hqe in one of the specified states,
511 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
512 through _get_agent_task_for_queue_entry. Each queue entry can only have
513 one agent task at a time, but there might be multiple queue entries in
514 the group.
515
516 @return: A list of AgentTasks.
517 """
showardd1195652009-12-08 22:21:02 +0000518 # host queue entry statuses handled directly by AgentTasks (Verifying is
519 # handled through SpecialTasks, so is not listed here)
520 statuses = (models.HostQueueEntry.Status.STARTING,
521 models.HostQueueEntry.Status.RUNNING,
522 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000523 models.HostQueueEntry.Status.PARSING,
524 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000525 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000526 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000527 where='status IN (%s)' % status_list)
528
529 agent_tasks = []
530 used_queue_entries = set()
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800531 hqe_count_by_status = {}
showardd1195652009-12-08 22:21:02 +0000532 for entry in queue_entries:
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800533 hqe_count_by_status[entry.status] = (
534 hqe_count_by_status.get(entry.status, 0) + 1)
showardd1195652009-12-08 22:21:02 +0000535 if self.get_agents_for_entry(entry):
536 # already being handled
537 continue
538 if entry in used_queue_entries:
539 # already picked up by a synchronous job
540 continue
541 agent_task = self._get_agent_task_for_queue_entry(entry)
542 agent_tasks.append(agent_task)
543 used_queue_entries.update(agent_task.queue_entries)
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800544
545 for status, count in hqe_count_by_status.iteritems():
546 metrics.Gauge(
547 'chromeos/autotest/scheduler/active_host_queue_entries'
548 ).set(count, fields={'status': status})
549
showardd1195652009-12-08 22:21:02 +0000550 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000551
552
showardd1195652009-12-08 22:21:02 +0000553 def _get_special_task_agent_tasks(self, is_active=False):
554 special_tasks = models.SpecialTask.objects.filter(
555 is_active=is_active, is_complete=False)
556 return [self._get_agent_task_for_special_task(task)
557 for task in special_tasks]
558
559
560 def _get_agent_task_for_queue_entry(self, queue_entry):
561 """
beeps8bb1f7d2013-08-05 01:30:09 -0700562 Construct an AgentTask instance for the given active HostQueueEntry.
563
showardd1195652009-12-08 22:21:02 +0000564 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700565 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000566 """
567 task_entries = queue_entry.job.get_group_entries(queue_entry)
568 self._check_for_duplicate_host_entries(task_entries)
569
570 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
571 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000572 if queue_entry.is_hostless():
573 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000574 return QueueTask(queue_entries=task_entries)
575 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700576 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000577 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700578 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000579 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700580 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000581
Prashanth B0e960282014-05-13 19:38:28 -0700582 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800583 '_get_agent_task_for_queue_entry got entry with '
584 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000585
586
587 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000588 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
589 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000590 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000591 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000592 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000593 if using_host:
showardd1195652009-12-08 22:21:02 +0000594 self._assert_host_has_no_agent(task_entry)
595
596
597 def _assert_host_has_no_agent(self, entry):
598 """
599 @param entry: a HostQueueEntry or a SpecialTask
600 """
601 if self.host_has_agent(entry.host):
602 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700603 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000604 'While scheduling %s, host %s already has a host agent %s'
605 % (entry, entry.host, agent.task))
606
607
608 def _get_agent_task_for_special_task(self, special_task):
609 """
610 Construct an AgentTask class to run the given SpecialTask and add it
611 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700612
MK Ryu35d661e2014-09-25 17:44:10 -0700613 A special task is created through schedule_special_tasks, but only if
beeps8bb1f7d2013-08-05 01:30:09 -0700614 the host doesn't already have an agent. This happens through
615 add_agent_task. All special agent tasks are given a host on creation,
616 and a Null hqe. To create a SpecialAgentTask object, you need a
617 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
618 object contains a hqe it's passed on to the special agent task, which
619 creates a HostQueueEntry and saves it as it's queue_entry.
620
showardd1195652009-12-08 22:21:02 +0000621 @param special_task: a models.SpecialTask instance
622 @returns an AgentTask to run this SpecialTask
623 """
624 self._assert_host_has_no_agent(special_task)
625
beeps5e2bb4a2013-10-28 11:26:45 -0700626 special_agent_task_classes = (prejob_task.CleanupTask,
627 prejob_task.VerifyTask,
628 prejob_task.RepairTask,
629 prejob_task.ResetTask,
630 prejob_task.ProvisionTask)
631
showardd1195652009-12-08 22:21:02 +0000632 for agent_task_class in special_agent_task_classes:
633 if agent_task_class.TASK_TYPE == special_task.task:
634 return agent_task_class(task=special_task)
635
Prashanth B0e960282014-05-13 19:38:28 -0700636 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800637 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000638
639
640 def _register_pidfiles(self, agent_tasks):
641 for agent_task in agent_tasks:
642 agent_task.register_necessary_pidfiles()
643
644
645 def _recover_tasks(self, agent_tasks):
646 orphans = _drone_manager.get_orphaned_autoserv_processes()
647
648 for agent_task in agent_tasks:
649 agent_task.recover()
650 if agent_task.monitor and agent_task.monitor.has_process():
651 orphans.discard(agent_task.monitor.get_process())
652 self.add_agent_task(agent_task)
653
654 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000655
656
showard8cc058f2009-09-08 16:26:33 +0000657 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000658 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
659 % status):
showard0db3d432009-10-12 20:29:15 +0000660 if entry.status == status and not self.get_agents_for_entry(entry):
661 # The status can change during iteration, e.g., if job.run()
662 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000663 yield entry
664
665
showard6878e8b2009-07-20 22:37:45 +0000666 def _check_for_remaining_orphan_processes(self, orphans):
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800667 m = 'chromeos/autotest/errors/unrecovered_orphan_processes'
668 metrics.Gauge(m).set(len(orphans))
669
showard6878e8b2009-07-20 22:37:45 +0000670 if not orphans:
671 return
672 subject = 'Unrecovered orphan autoserv processes remain'
673 message = '\n'.join(str(process) for process in orphans)
mbligh5fa9e112009-08-03 16:46:06 +0000674 die_on_orphans = global_config.global_config.get_config_value(
675 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
676
677 if die_on_orphans:
678 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000679
showard170873e2009-01-07 00:22:26 +0000680
showard8cc058f2009-09-08 16:26:33 +0000681 def _recover_pending_entries(self):
682 for entry in self._get_unassigned_entries(
683 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000684 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000685 entry.on_pending()
686
687
showardb8900452009-10-12 20:31:01 +0000688 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000689 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000690 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
691 unrecovered_hqes = []
692 for queue_entry in queue_entries:
693 special_tasks = models.SpecialTask.objects.filter(
694 task__in=(models.SpecialTask.Task.CLEANUP,
695 models.SpecialTask.Task.VERIFY),
696 queue_entry__id=queue_entry.id,
697 is_complete=False)
698 if special_tasks.count() == 0:
699 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000700
showardb8900452009-10-12 20:31:01 +0000701 if unrecovered_hqes:
702 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700703 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000704 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000705 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000706
707
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800708 @_calls_log_tick_msg
showard65db3932009-10-28 19:54:35 +0000709 def _schedule_special_tasks(self):
710 """
711 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700712
713 Special tasks include PreJobTasks like verify, reset and cleanup.
714 They are created through _schedule_new_jobs and associated with a hqe
715 This method translates SpecialTasks to the appropriate AgentTask and
716 adds them to the dispatchers agents list, so _handle_agents can execute
717 them.
showard65db3932009-10-28 19:54:35 +0000718 """
Prashanth B4ec98672014-05-15 10:44:54 -0700719 # When the host scheduler is responsible for acquisition we only want
720 # to run tasks with leased hosts. All hqe tasks will already have
721 # leased hosts, and we don't want to run frontend tasks till the host
722 # scheduler has vetted the assignment. Note that this doesn't include
723 # frontend tasks with hosts leased by other active hqes.
724 for task in self._job_query_manager.get_prioritized_special_tasks(
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800725 only_tasks_with_leased_hosts=not self._inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000726 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000727 continue
showardd1195652009-12-08 22:21:02 +0000728 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000729
730
showard170873e2009-01-07 00:22:26 +0000731 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000732 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000733 # should never happen
showarded2afea2009-07-07 20:54:07 +0000734 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000735 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000736 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700737 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000738 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000739
740
jadmanski0afbb632008-06-06 21:10:57 +0000741 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000742 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700743 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000744 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000745 if self.host_has_agent(host):
746 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000747 continue
showard8cc058f2009-09-08 16:26:33 +0000748 if self._host_has_scheduled_special_task(host):
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700749 # host will have a special task scheduled on the next tick
showard8cc058f2009-09-08 16:26:33 +0000750 continue
showard170873e2009-01-07 00:22:26 +0000751 if print_message:
showardb18134f2009-03-20 20:52:18 +0000752 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000753 models.SpecialTask.objects.create(
754 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000755 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000756
757
jadmanski0afbb632008-06-06 21:10:57 +0000758 def _recover_hosts(self):
759 # recover "Repair Failed" hosts
760 message = 'Reverifying dead host %s'
761 self._reverify_hosts_where("status = 'Repair Failed'",
762 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000763
764
showard89f84db2009-03-12 20:39:13 +0000765 def _refresh_pending_queue_entries(self):
766 """
767 Lookup the pending HostQueueEntries and call our HostScheduler
768 refresh() method given that list. Return the list.
769
770 @returns A list of pending HostQueueEntries sorted in priority order.
771 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700772 queue_entries = self._job_query_manager.get_pending_queue_entries(
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800773 only_hostless=not self._inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000774 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000775 return []
showard89f84db2009-03-12 20:39:13 +0000776 return queue_entries
777
778
showarda9545c02009-12-18 22:44:26 +0000779 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800780 """Schedule a hostless (suite) job.
781
782 @param queue_entry: The queue_entry representing the hostless job.
783 """
showarda9545c02009-12-18 22:44:26 +0000784 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700785
786 # Need to set execution_subdir before setting the status:
787 # After a restart of the scheduler, agents will be restored for HQEs in
788 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
789 # execution_subdir is needed. Therefore it must be set before entering
790 # one of these states.
791 # Otherwise, if the scheduler was interrupted between setting the status
792 # and the execution_subdir, upon it's restart restoring agents would
793 # fail.
794 # Is there a way to get a status in one of these states without going
795 # through this code? Following cases are possible:
796 # - If it's aborted before being started:
797 # active bit will be 0, so there's nothing to parse, it will just be
798 # set to completed by _find_aborting. Critical statuses are skipped.
799 # - If it's aborted or it fails after being started:
800 # It was started, so this code was executed.
801 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000802 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000803
804
beepscc9fc702013-12-02 12:45:38 -0800805 def _schedule_host_job(self, host, queue_entry):
806 """Schedules a job on the given host.
807
808 1. Assign the host to the hqe, if it isn't already assigned.
809 2. Create a SpecialAgentTask for the hqe.
810 3. Activate the hqe.
811
812 @param queue_entry: The job to schedule.
813 @param host: The host to schedule the job on.
814 """
815 if self.host_has_agent(host):
816 host_agent_task = list(self._host_agents.get(host.id))[0].task
beepscc9fc702013-12-02 12:45:38 -0800817 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700818 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800819
820
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800821 @_calls_log_tick_msg
showard89f84db2009-03-12 20:39:13 +0000822 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700823 """
824 Find any new HQEs and call schedule_pre_job_tasks for it.
825
826 This involves setting the status of the HQE and creating a row in the
827 db corresponding the the special task, through
828 scheduler_models._queue_special_task. The new db row is then added as
829 an agent to the dispatcher through _schedule_special_tasks and
830 scheduled for execution on the drone through _handle_agents.
831 """
showard89f84db2009-03-12 20:39:13 +0000832 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000833
beepscc9fc702013-12-02 12:45:38 -0800834 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700835 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700836 new_jobs_with_hosts = 0
837 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800838 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700839 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000840
beepscc9fc702013-12-02 12:45:38 -0800841 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000842 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000843 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700844 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000845 else:
beepscc9fc702013-12-02 12:45:38 -0800846 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700847 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700848
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800849 metrics.Counter(
850 'chromeos/autotest/scheduler/scheduled_jobs_hostless'
851 ).increment_by(new_hostless_jobs)
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800852
beepscc9fc702013-12-02 12:45:38 -0800853 if not host_jobs:
854 return
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800855
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800856 if not self._inline_host_acquisition:
Prathmesh Prabhu3c6a3bf2016-12-20 11:29:02 -0800857 # In this case, host_scheduler is responsible for scheduling
858 # host_jobs. Scheduling the jobs ourselves can lead to DB corruption
859 # since host_scheduler assumes it is the single process scheduling
860 # host jobs.
861 metrics.Gauge(
862 'chromeos/autotest/errors/scheduler/unexpected_host_jobs').set(
863 len(host_jobs))
864 return
865
Prashanth Bf66d51b2014-05-06 12:42:25 -0700866 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
867 for host_assignment in jobs_with_hosts:
868 self._schedule_host_job(host_assignment.host, host_assignment.job)
869 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800870
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800871 metrics.Counter(
872 'chromeos/autotest/scheduler/scheduled_jobs_with_hosts'
873 ).increment_by(new_jobs_with_hosts)
874 # TODO(pprabhu): Decide what to do about this metric. Million dollar
875 # question: What happens to jobs that were not matched. Do they stay in
876 # the queue, and get processed right here in the next tick (then we want
877 # a guage corresponding to the number of outstanding unmatched host
878 # jobs), or are they handled somewhere else (then we need a counter
879 # corresponding to failed_to_match_with_hosts jobs).
880 #autotest_stats.Gauge(key).send('new_jobs_without_hosts',
881 # new_jobs_need_hosts -
882 # new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000883
884
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800885 @_calls_log_tick_msg
showard8cc058f2009-09-08 16:26:33 +0000886 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700887 """
888 Adds agents to the dispatcher.
889
890 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
891 QueueTask for example, will have a job with a control file, and
892 the agent will have methods that poll, abort and check if the queue
893 task is finished. The dispatcher runs the agent_task, as well as
894 other agents in it's _agents member, through _handle_agents, by
895 calling the Agents tick().
896
897 This method creates an agent for each HQE in one of (starting, running,
898 gathering, parsing, archiving) states, and adds it to the dispatcher so
899 it is handled by _handle_agents.
900 """
showardd1195652009-12-08 22:21:02 +0000901 for agent_task in self._get_queue_entry_agent_tasks():
902 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000903
904
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800905 @_calls_log_tick_msg
jadmanski0afbb632008-06-06 21:10:57 +0000906 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700907 """
908 Looks through the afe_host_queue_entries for an aborted entry.
909
910 The aborted bit is set on an HQE in many ways, the most common
911 being when a user requests an abort through the frontend, which
912 results in an rpc from the afe to abort_host_queue_entries.
913 """
jamesrene7c65cb2010-06-08 20:38:10 +0000914 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000915 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700916 where='aborted=1 and complete=0'):
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800917
918 # If the job is running on a shard, let the shard handle aborting
919 # it and sync back the right status.
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800920 if entry.job.shard_id is not None and not server_utils.is_shard():
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800921 logging.info('Waiting for shard %s to abort hqe %s',
922 entry.job.shard_id, entry)
923 continue
924
showardf4a2e502009-07-28 20:06:39 +0000925 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800926
927 # The task would have started off with both is_complete and
928 # is_active = False. Aborted tasks are neither active nor complete.
929 # For all currently active tasks this will happen through the agent,
930 # but we need to manually update the special tasks that haven't
931 # started yet, because they don't have agents.
932 models.SpecialTask.objects.filter(is_active=False,
933 queue_entry_id=entry.id).update(is_complete=True)
934
showardd3dc1992009-04-22 21:01:40 +0000935 for agent in self.get_agents_for_entry(entry):
936 agent.abort()
937 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000938 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700939 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000940 for job in jobs_to_stop:
941 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000942
943
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800944 @_calls_log_tick_msg
beeps8bb1f7d2013-08-05 01:30:09 -0700945 def _find_aborted_special_tasks(self):
946 """
947 Find SpecialTasks that have been marked for abortion.
948
949 Poll the database looking for SpecialTasks that are active
950 and have been marked for abortion, then abort them.
951 """
952
953 # The completed and active bits are very important when it comes
954 # to scheduler correctness. The active bit is set through the prolog
955 # of a special task, and reset through the cleanup method of the
956 # SpecialAgentTask. The cleanup is called both through the abort and
957 # epilog. The complete bit is set in several places, and in general
958 # a hanging job will have is_active=1 is_complete=0, while a special
959 # task which completed will have is_active=0 is_complete=1. To check
960 # aborts we directly check active because the complete bit is set in
961 # several places, including the epilog of agent tasks.
962 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
963 is_aborted=True)
964 for task in aborted_tasks:
965 # There are 2 ways to get the agent associated with a task,
966 # through the host and through the hqe. A special task
967 # always needs a host, but doesn't always need a hqe.
968 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700969 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000970
beeps8bb1f7d2013-08-05 01:30:09 -0700971 # The epilog preforms critical actions such as
972 # queueing the next SpecialTask, requeuing the
973 # hqe etc, however it doesn't actually kill the
974 # monitor process and set the 'done' bit. Epilogs
975 # assume that the job failed, and that the monitor
976 # process has already written an exit code. The
977 # done bit is a necessary condition for
978 # _handle_agents to schedule any more special
979 # tasks against the host, and it must be set
980 # in addition to is_active, is_complete and success.
981 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000982 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700983
984
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700985 def _can_start_agent(self, agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000986 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000987 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000988 return True
989 # don't allow any nonzero-process agents to run after we've reached a
990 # limit (this avoids starvation of many-process agents)
991 if have_reached_limit:
992 return False
993 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000994 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000995 agent.task.owner_username,
996 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000997 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000998 return False
showard4c5374f2008-09-04 17:02:56 +0000999 return True
1000
1001
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -08001002 @_calls_log_tick_msg
jadmanski0afbb632008-06-06 21:10:57 +00001003 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -07001004 """
1005 Handles agents of the dispatcher.
1006
1007 Appropriate Agents are added to the dispatcher through
1008 _schedule_running_host_queue_entries. These agents each
1009 have a task. This method runs the agents task through
1010 agent.tick() leading to:
1011 agent.start
1012 prolog -> AgentTasks prolog
1013 For each queue entry:
1014 sets host status/status to Running
1015 set started_on in afe_host_queue_entries
1016 run -> AgentTasks run
1017 Creates PidfileRunMonitor
1018 Queues the autoserv command line for this AgentTask
1019 via the drone manager. These commands are executed
1020 through the drone managers execute actions.
1021 poll -> AgentTasks/BaseAgentTask poll
1022 checks the monitors exit_code.
1023 Executes epilog if task is finished.
1024 Executes AgentTasks _finish_task
1025 finish_task is usually responsible for setting the status
1026 of the HQE/host, and updating it's active and complete fileds.
1027
1028 agent.is_done
1029 Removed the agent from the dispatchers _agents queue.
1030 Is_done checks the finished bit on the agent, that is
1031 set based on the Agents task. During the agents poll
1032 we check to see if the monitor process has exited in
1033 it's finish method, and set the success member of the
1034 task based on this exit code.
1035 """
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001036 num_started_this_tick = 0
1037 num_finished_this_tick = 0
showard4c5374f2008-09-04 17:02:56 +00001038 have_reached_limit = False
1039 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -07001040 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +00001041 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001042 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1043 'queue_entry ids:%s' % (agent.host_ids,
1044 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001045 if not agent.started:
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001046 if not self._can_start_agent(agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001047 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001048 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001049 continue
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001050 num_started_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001051 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001052 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001053 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001054 if agent.is_done():
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001055 num_finished_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001056 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001057 self.remove_agent(agent)
Prathmesh Prabhued7ece92016-11-23 11:19:43 -08001058
1059 metrics.Counter(
1060 'chromeos/autotest/scheduler/agent_processes_started'
1061 ).increment_by(num_started_this_tick)
1062 metrics.Counter(
1063 'chromeos/autotest/scheduler/agent_processes_finished'
1064 ).increment_by(num_finished_this_tick)
1065 num_agent_processes = _drone_manager.total_running_processes()
1066 metrics.Gauge(
1067 'chromeos/autotest/scheduler/agent_processes'
1068 ).set(num_agent_processes)
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001069 logging.info('%d running processes. %d added this tick.',
Prathmesh Prabhued7ece92016-11-23 11:19:43 -08001070 num_agent_processes, num_started_this_tick)
mbligh36768f02008-02-22 18:28:33 +00001071
1072
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -08001073 def _log_tick_msg(self, msg):
1074 if self._tick_debug:
1075 logging.debug(msg)
1076
1077
1078 def _log_extra_msg(self, msg):
1079 if self._extra_debugging:
1080 logging.debug(msg)
1081
1082
Simran Basia858a232012-08-21 11:04:37 -07001083SiteDispatcher = utils.import_site_class(
1084 __file__, 'autotest_lib.scheduler.site_monitor_db',
1085 'SiteDispatcher', BaseDispatcher)
1086
1087class Dispatcher(SiteDispatcher):
1088 pass
1089
1090
mbligh36768f02008-02-22 18:28:33 +00001091class Agent(object):
showard77182562009-06-10 00:16:05 +00001092 """
Alex Miller47715eb2013-07-24 03:34:01 -07001093 An agent for use by the Dispatcher class to perform a task. An agent wraps
1094 around an AgentTask mainly to associate the AgentTask with the queue_entry
1095 and host ids.
showard77182562009-06-10 00:16:05 +00001096
1097 The following methods are required on all task objects:
1098 poll() - Called periodically to let the task check its status and
1099 update its internal state. If the task succeeded.
1100 is_done() - Returns True if the task is finished.
1101 abort() - Called when an abort has been requested. The task must
1102 set its aborted attribute to True if it actually aborted.
1103
1104 The following attributes are required on all task objects:
1105 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001106 success - bool, True if this task succeeded.
1107 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1108 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001109 """
1110
1111
showard418785b2009-11-23 20:19:59 +00001112 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001113 """
Alex Miller47715eb2013-07-24 03:34:01 -07001114 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001115 """
showard8cc058f2009-09-08 16:26:33 +00001116 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001117
showard77182562009-06-10 00:16:05 +00001118 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001119 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001120
showard8cc058f2009-09-08 16:26:33 +00001121 self.queue_entry_ids = task.queue_entry_ids
1122 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001123
showard8cc058f2009-09-08 16:26:33 +00001124 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001125 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001126
1127
jadmanski0afbb632008-06-06 21:10:57 +00001128 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001129 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001130 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001131 self.task.poll()
1132 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001133 self.finished = True
showardec113162008-05-08 00:52:49 +00001134
1135
jadmanski0afbb632008-06-06 21:10:57 +00001136 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001137 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001138
1139
showardd3dc1992009-04-22 21:01:40 +00001140 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001141 if self.task:
1142 self.task.abort()
1143 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001144 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001145 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001146
showardd3dc1992009-04-22 21:01:40 +00001147
beeps5e2bb4a2013-10-28 11:26:45 -07001148class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001149 """
1150 Common functionality for QueueTask and HostlessQueueTask
1151 """
1152 def __init__(self, queue_entries):
1153 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001154 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001155 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001156
1157
showard73ec0442009-02-07 02:05:20 +00001158 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001159 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001160
1161
jamesrenc44ae992010-02-19 00:12:54 +00001162 def _write_control_file(self, execution_path):
1163 control_path = _drone_manager.attach_file_to_execution(
1164 execution_path, self.job.control_file)
1165 return control_path
1166
1167
Aviv Keshet308e7362013-05-21 14:43:16 -07001168 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001169 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001170 execution_path = self.queue_entries[0].execution_path()
1171 control_path = self._write_control_file(execution_path)
1172 hostnames = ','.join(entry.host.hostname
1173 for entry in self.queue_entries
1174 if not entry.is_hostless())
1175
1176 execution_tag = self.queue_entries[0].execution_tag()
1177 params = _autoserv_command_line(
1178 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001179 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001180 _drone_manager.absolute_path(control_path)],
1181 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001182 if self.job.is_image_update_job():
1183 params += ['--image', self.job.update_image_path]
1184
jamesrenc44ae992010-02-19 00:12:54 +00001185 return params
showardd1195652009-12-08 22:21:02 +00001186
1187
1188 @property
1189 def num_processes(self):
1190 return len(self.queue_entries)
1191
1192
1193 @property
1194 def owner_username(self):
1195 return self.job.owner
1196
1197
1198 def _working_directory(self):
1199 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001200
1201
jadmanski0afbb632008-06-06 21:10:57 +00001202 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001203 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001204 keyval_dict = self.job.keyval_dict()
1205 keyval_dict[queued_key] = queued_time
showardf1ae3542009-05-11 19:26:02 +00001206 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001207 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001208 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001209 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001210
1211
showard35162b02009-03-03 02:17:30 +00001212 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001213 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001214 _drone_manager.write_lines_to_file(error_file_path,
1215 [_LOST_PROCESS_ERROR])
1216
1217
showardd3dc1992009-04-22 21:01:40 +00001218 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001219 if not self.monitor:
1220 return
1221
showardd9205182009-04-27 20:09:55 +00001222 self._write_job_finished()
1223
showard35162b02009-03-03 02:17:30 +00001224 if self.monitor.lost_process:
1225 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001226
jadmanskif7fa2cc2008-10-01 14:13:23 +00001227
showardcbd74612008-11-19 21:42:02 +00001228 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001229 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001230 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001231 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001232 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001233
1234
jadmanskif7fa2cc2008-10-01 14:13:23 +00001235 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001236 if not self.monitor or not self.monitor.has_process():
1237 return
1238
jadmanskif7fa2cc2008-10-01 14:13:23 +00001239 # build up sets of all the aborted_by and aborted_on values
1240 aborted_by, aborted_on = set(), set()
1241 for queue_entry in self.queue_entries:
1242 if queue_entry.aborted_by:
1243 aborted_by.add(queue_entry.aborted_by)
1244 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1245 aborted_on.add(t)
1246
1247 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001248 # TODO(showard): this conditional is now obsolete, we just need to leave
1249 # it in temporarily for backwards compatibility over upgrades. delete
1250 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001251 assert len(aborted_by) <= 1
1252 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001253 aborted_by_value = aborted_by.pop()
1254 aborted_on_value = max(aborted_on)
1255 else:
1256 aborted_by_value = 'autotest_system'
1257 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001258
showarda0382352009-02-11 23:36:43 +00001259 self._write_keyval_after_job("aborted_by", aborted_by_value)
1260 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001261
showardcbd74612008-11-19 21:42:02 +00001262 aborted_on_string = str(datetime.datetime.fromtimestamp(
1263 aborted_on_value))
1264 self._write_status_comment('Job aborted by %s on %s' %
1265 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001266
1267
jadmanski0afbb632008-06-06 21:10:57 +00001268 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001269 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001270 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001271 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001272
1273
jadmanski0afbb632008-06-06 21:10:57 +00001274 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001275 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001276 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001277
1278
1279class QueueTask(AbstractQueueTask):
1280 def __init__(self, queue_entries):
1281 super(QueueTask, self).__init__(queue_entries)
1282 self._set_ids(queue_entries=queue_entries)
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -08001283 self._enable_ssp_container = (
1284 global_config.global_config.get_config_value(
1285 'AUTOSERV', 'enable_ssp_container', type=bool,
1286 default=True))
showarda9545c02009-12-18 22:44:26 +00001287
1288
1289 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001290 self._check_queue_entry_statuses(
1291 self.queue_entries,
1292 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1293 models.HostQueueEntry.Status.RUNNING),
1294 allowed_host_statuses=(models.Host.Status.PENDING,
1295 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001296
1297 super(QueueTask, self).prolog()
1298
1299 for queue_entry in self.queue_entries:
1300 self._write_host_keyvals(queue_entry.host)
1301 queue_entry.host.set_status(models.Host.Status.RUNNING)
1302 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001303
1304
1305 def _finish_task(self):
1306 super(QueueTask, self)._finish_task()
1307
1308 for queue_entry in self.queue_entries:
1309 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001310 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001311
1312
Alex Miller9f01d5d2013-08-08 02:26:01 -07001313 def _command_line(self):
Dan Shi36cfd832014-10-10 13:38:51 -07001314 invocation = super(QueueTask, self)._command_line()
1315 # Check if server-side packaging is needed.
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -08001316 if (self._enable_ssp_container and
Dan Shi36cfd832014-10-10 13:38:51 -07001317 self.job.control_type == control_data.CONTROL_TYPE.SERVER and
1318 self.job.require_ssp != False):
Dan Shiec1d47d2015-02-13 11:38:13 -08001319 invocation += ['--require-ssp']
Dan Shi36cfd832014-10-10 13:38:51 -07001320 keyval_dict = self.job.keyval_dict()
1321 test_source_build = keyval_dict.get('test_source_build', None)
1322 if test_source_build:
1323 invocation += ['--test_source_build', test_source_build]
Dan Shi70647ca2015-07-16 22:52:35 -07001324 if self.job.parent_job_id:
1325 invocation += ['--parent_job_id', str(self.job.parent_job_id)]
Dan Shi36cfd832014-10-10 13:38:51 -07001326 return invocation + ['--verify_job_repo_url']
Alex Miller9f01d5d2013-08-08 02:26:01 -07001327
1328
Dan Shi1a189052013-10-28 14:41:35 -07001329class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001330 def __init__(self, queue_entry):
1331 super(HostlessQueueTask, self).__init__([queue_entry])
1332 self.queue_entry_ids = [queue_entry.id]
1333
1334
1335 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001336 super(HostlessQueueTask, self).prolog()
1337
1338
mbligh4608b002010-01-05 18:22:35 +00001339 def _finish_task(self):
1340 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001341
1342 # When a job is added to database, its initial status is always
1343 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1344 # status, check if any of them can be started. If scheduler hits some
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001345 # limit, e.g., max_hostless_jobs_per_drone, scheduler will
1346 # leave these jobs in Starting status. Otherwise, the jobs'
1347 # status will be changed to Running, and an autoserv process
1348 # will be started in drone for each of these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001349 # If the entry is still in status Starting, the process has not started
1350 # yet. Therefore, there is no need to parse and collect log. Without
1351 # this check, exception will be raised by scheduler as execution_subdir
1352 # for this queue entry does not have a value yet.
1353 hqe = self.queue_entries[0]
1354 if hqe.status != models.HostQueueEntry.Status.STARTING:
1355 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001356
1357
mbligh36768f02008-02-22 18:28:33 +00001358if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001359 main()