blob: 1c0dfe73da7ef41d30f72be413292ce32705e64b [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
Richard Barnetteffed1722016-05-18 15:57:22 -07002
3#pylint: disable=C0111
4
mbligh36768f02008-02-22 18:28:33 +00005"""
6Autotest scheduler
7"""
showard909c7a62008-07-15 21:52:38 +00008
Dan Shif6c65bd2014-08-29 16:15:07 -07009import datetime
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -080010import functools
Dan Shif6c65bd2014-08-29 16:15:07 -070011import gc
12import logging
13import optparse
14import os
15import signal
16import sys
17import time
showard402934a2009-12-21 22:20:47 +000018
Alex Miller05d7b4c2013-03-04 07:49:38 -080019import common
showard21baa452008-10-21 00:08:39 +000020from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000021
22import django.db
23
Dan Shiec1d47d2015-02-13 11:38:13 -080024from autotest_lib.client.common_lib import control_data
Prashanth B0e960282014-05-13 19:38:28 -070025from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070026from autotest_lib.client.common_lib import utils
xixuan4de4e742017-01-30 13:31:35 -080027from autotest_lib.frontend.afe import models
Fang Dengc330bee2014-10-21 18:10:55 -070028from autotest_lib.scheduler import agent_task, drone_manager
beeps5e2bb4a2013-10-28 11:26:45 -070029from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
30from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070031from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070032from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070033from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000034from autotest_lib.scheduler import scheduler_models
Aviv Keshet6b4e3c22017-05-04 20:14:02 -070035from autotest_lib.scheduler import scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070036from autotest_lib.server import autoserv_utils
Dan Shi114e1722016-01-10 18:12:53 -080037from autotest_lib.server import system_utils
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080038from autotest_lib.server import utils as server_utils
Dan Shicf2e8dd2015-05-07 17:18:48 -070039from autotest_lib.site_utils import metadata_reporter
Dan Shib9144a42014-12-01 16:09:32 -080040from autotest_lib.site_utils import server_manager_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080041
Dan Shi5e2efb72017-02-07 11:40:23 -080042try:
43 from chromite.lib import metrics
44 from chromite.lib import ts_mon_config
45except ImportError:
46 metrics = utils.metrics_mock
47 ts_mon_config = utils.metrics_mock
48
Dan Shicf2e8dd2015-05-07 17:18:48 -070049
showard549afad2009-08-20 23:33:36 +000050PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000051
mbligh36768f02008-02-22 18:28:33 +000052RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000053AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
54
55if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000056 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000057AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
58AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
59
60if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000061 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000062
showard35162b02009-03-03 02:17:30 +000063# error message to leave in results dir when an autoserv process disappears
64# mysteriously
65_LOST_PROCESS_ERROR = """\
66Autoserv failed abnormally during execution for this job, probably due to a
67system error on the Autotest server. Full results may not be available. Sorry.
68"""
69
Prashanth B0e960282014-05-13 19:38:28 -070070_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070071_db = None
mbligh36768f02008-02-22 18:28:33 +000072_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070073
74# These 2 globals are replaced for testing
75_autoserv_directory = autoserv_utils.autoserv_directory
76_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000077_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000078_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070079
mbligh36768f02008-02-22 18:28:33 +000080
jamesren76fcf192010-04-21 20:39:50 +000081def _verify_default_drone_set_exists():
82 if (models.DroneSet.drone_sets_enabled() and
83 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070084 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080085 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000086
87
88def _sanity_check():
89 """Make sure the configs are consistent before starting the scheduler"""
90 _verify_default_drone_set_exists()
91
92
mbligh36768f02008-02-22 18:28:33 +000093def main():
showard27f33872009-04-07 18:20:53 +000094 try:
showard549afad2009-08-20 23:33:36 +000095 try:
96 main_without_exception_handling()
97 except SystemExit:
98 raise
99 except:
100 logging.exception('Exception escaping in monitor_db')
101 raise
102 finally:
103 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000104
105
106def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700107 scheduler_lib.setup_logging(
108 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
109 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000110 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000111 parser = optparse.OptionParser(usage)
112 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
113 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000114 parser.add_option('--test', help='Indicate that scheduler is under ' +
115 'test and should use dummy autoserv and no parsing',
116 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700117 parser.add_option('--production',
118 help=('Indicate that scheduler is running in production '
119 'environment and it can use database that is not '
120 'hosted in localhost. If it is set to False, '
121 'scheduler will fail if database is not in '
122 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700123 action='store_true', default=False)
jadmanski0afbb632008-06-06 21:10:57 +0000124 (options, args) = parser.parse_args()
125 if len(args) != 1:
126 parser.print_usage()
127 return
mbligh36768f02008-02-22 18:28:33 +0000128
Dan Shif6c65bd2014-08-29 16:15:07 -0700129 scheduler_lib.check_production_settings(options)
130
showard5613c662009-06-08 23:30:33 +0000131 scheduler_enabled = global_config.global_config.get_config_value(
132 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
133
134 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800135 logging.error("Scheduler not enabled, set enable_scheduler to true in "
136 "the global_config's SCHEDULER section to enable it. "
137 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000138 sys.exit(1)
139
jadmanski0afbb632008-06-06 21:10:57 +0000140 global RESULTS_DIR
141 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000142
showardcca334f2009-03-12 20:38:34 +0000143 # Change the cwd while running to avoid issues incase we were launched from
144 # somewhere odd (such as a random NFS home directory of the person running
145 # sudo to launch us as the appropriate user).
146 os.chdir(RESULTS_DIR)
147
jamesrenc7d387e2010-08-10 21:48:30 +0000148 # This is helpful for debugging why stuff a scheduler launches is
149 # misbehaving.
150 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000151
jadmanski0afbb632008-06-06 21:10:57 +0000152 if options.test:
153 global _autoserv_path
154 _autoserv_path = 'autoserv_dummy'
155 global _testing_mode
156 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000157
Dan Shicf2e8dd2015-05-07 17:18:48 -0700158 # Start the thread to report metadata.
159 metadata_reporter.start()
160
Paul Hobbseedcb8b2016-10-05 16:44:27 -0700161 with ts_mon_config.SetupTsMonGlobalState('autotest_scheduler',
162 indirect=True):
163 try:
164 initialize()
165 dispatcher = Dispatcher()
166 dispatcher.initialize(recover_hosts=options.recover_hosts)
167 minimum_tick_sec = global_config.global_config.get_config_value(
168 scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
Richard Barnetteffed1722016-05-18 15:57:22 -0700169
Aviv Keshet6b4e3c22017-05-04 20:14:02 -0700170 while not _shutdown:
Paul Hobbseedcb8b2016-10-05 16:44:27 -0700171 start = time.time()
172 dispatcher.tick()
173 curr_tick_sec = time.time() - start
174 if minimum_tick_sec > curr_tick_sec:
175 time.sleep(minimum_tick_sec - curr_tick_sec)
176 else:
177 time.sleep(0.0001)
178 except server_manager_utils.ServerActionError as e:
179 # This error is expected when the server is not in primary status
180 # for scheduler role. Thus do not send email for it.
181 logging.exception(e)
182 except Exception:
Aviv Keshetea8f8512017-05-05 10:04:39 -0700183 logging.exception('Uncaught exception, terminating monitor_db.')
Aviv Keshet7f73f662017-04-27 11:48:45 -0700184 metrics.Counter('chromeos/autotest/scheduler/uncaught_exception'
185 ).increment()
jadmanski0afbb632008-06-06 21:10:57 +0000186
Paul Hobbsabd3b052016-10-03 18:25:23 +0000187 metadata_reporter.abort()
188 email_manager.manager.send_queued_emails()
Paul Hobbsabd3b052016-10-03 18:25:23 +0000189 _drone_manager.shutdown()
190 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000191
192
Prashanth B4ec98672014-05-15 10:44:54 -0700193def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000194 global _shutdown
195 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000196 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000197
198
jamesrenc44ae992010-02-19 00:12:54 +0000199def initialize():
showardb18134f2009-03-20 20:52:18 +0000200 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
201 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000202
showard8de37132009-08-31 18:33:08 +0000203 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000204 logging.critical("monitor_db already running, aborting!")
205 sys.exit(1)
206 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000207
showardb1e51872008-10-07 11:08:18 +0000208 if _testing_mode:
209 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700210 scheduler_lib.DB_CONFIG_SECTION, 'database',
211 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000212
Dan Shib9144a42014-12-01 16:09:32 -0800213 # If server database is enabled, check if the server has role `scheduler`.
214 # If the server does not have scheduler role, exception will be raised and
215 # scheduler will not continue to run.
216 if server_manager_utils.use_server_db():
217 server_manager_utils.confirm_server_has_role(hostname='localhost',
218 role='scheduler')
219
jadmanski0afbb632008-06-06 21:10:57 +0000220 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700221 global _db_manager
222 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700223 global _db
224 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000225 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700226 signal.signal(signal.SIGINT, handle_signal)
227 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000228
jamesrenc44ae992010-02-19 00:12:54 +0000229 initialize_globals()
230 scheduler_models.initialize()
231
Dan Shi114e1722016-01-10 18:12:53 -0800232 drone_list = system_utils.get_drones()
showard170873e2009-01-07 00:22:26 +0000233 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000234 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000235 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
236
showardb18134f2009-03-20 20:52:18 +0000237 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000238
239
jamesrenc44ae992010-02-19 00:12:54 +0000240def initialize_globals():
241 global _drone_manager
242 _drone_manager = drone_manager.instance()
243
244
showarded2afea2009-07-07 20:54:07 +0000245def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
246 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000247 """
248 @returns The autoserv command line as a list of executable + parameters.
249
250 @param machines - string - A machine or comma separated list of machines
251 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000252 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700253 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
254 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000255 @param queue_entry - A HostQueueEntry object - If supplied and no Job
256 object was supplied, this will be used to lookup the Job object.
257 """
Simran Basi1bf60eb2015-12-01 16:39:29 -0800258 command = autoserv_utils.autoserv_run_job_command(_autoserv_directory,
Aviv Keshet308e7362013-05-21 14:43:16 -0700259 machines, results_directory=drone_manager.WORKING_DIRECTORY,
260 extra_args=extra_args, job=job, queue_entry=queue_entry,
Simran Basi1bf60eb2015-12-01 16:39:29 -0800261 verbose=verbose, in_lab=True)
262 return command
showard87ba02a2009-04-20 19:37:32 +0000263
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800264def _calls_log_tick_msg(func):
Allen Li7bea6a02017-02-06 17:01:20 -0800265 """Used to trace functions called by Dispatcher.tick."""
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800266 @functools.wraps(func)
267 def wrapper(self, *args, **kwargs):
268 self._log_tick_msg('Starting %s' % func.__name__)
269 return func(self, *args, **kwargs)
270
271 return wrapper
272
showard87ba02a2009-04-20 19:37:32 +0000273
Allen Li7bea6a02017-02-06 17:01:20 -0800274class Dispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800275
276
jadmanski0afbb632008-06-06 21:10:57 +0000277 def __init__(self):
278 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000279 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700280 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000281 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700282 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700283 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Dan Shic458f662015-04-29 12:12:38 -0700284 _db, _drone_manager)
showard170873e2009-01-07 00:22:26 +0000285 self._host_agents = {}
286 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000287 self._tick_count = 0
288 self._last_garbage_stats_time = time.time()
289 self._seconds_between_garbage_stats = 60 * (
290 global_config.global_config.get_config_value(
291 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700292 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700293 self._tick_debug = global_config.global_config.get_config_value(
294 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
295 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700296 self._extra_debugging = global_config.global_config.get_config_value(
297 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
298 default=False)
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800299 self._inline_host_acquisition = (
300 global_config.global_config.get_config_value(
301 scheduler_config.CONFIG_SECTION,
302 'inline_host_acquisition', type=bool, default=True))
mbligh36768f02008-02-22 18:28:33 +0000303
Prashanth Bf66d51b2014-05-06 12:42:25 -0700304 # If _inline_host_acquisition is set the scheduler will acquire and
305 # release hosts against jobs inline, with the tick. Otherwise the
306 # scheduler will only focus on jobs that already have hosts, and
307 # will not explicitly unlease a host when a job finishes using it.
308 self._job_query_manager = query_managers.AFEJobQueryManager()
309 self._host_scheduler = (host_scheduler.BaseHostScheduler()
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800310 if self._inline_host_acquisition else
Prashanth Bf66d51b2014-05-06 12:42:25 -0700311 host_scheduler.DummyHostScheduler())
312
mbligh36768f02008-02-22 18:28:33 +0000313
showard915958d2009-04-22 21:00:58 +0000314 def initialize(self, recover_hosts=True):
315 self._periodic_cleanup.initialize()
316 self._24hr_upkeep.initialize()
Dan Shi55d58992015-05-05 09:10:02 -0700317 # Execute all actions queued in the cleanup tasks. Scheduler tick will
318 # run a refresh task first. If there is any action in the queue, refresh
319 # will raise an exception.
320 _drone_manager.execute_actions()
showard915958d2009-04-22 21:00:58 +0000321
jadmanski0afbb632008-06-06 21:10:57 +0000322 # always recover processes
323 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000324
jadmanski0afbb632008-06-06 21:10:57 +0000325 if recover_hosts:
326 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000327
328
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800329 # TODO(pprabhu) Drop this metric once tick_times has been verified.
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -0800330 @metrics.SecondsTimerDecorator(
331 'chromeos/autotest/scheduler/tick_durations/tick')
jadmanski0afbb632008-06-06 21:10:57 +0000332 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700333 """
334 This is an altered version of tick() where we keep track of when each
335 major step begins so we can try to figure out where we are using most
336 of the tick time.
337 """
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800338 with metrics.RuntimeBreakdownTimer(
339 'chromeos/autotest/scheduler/tick_times') as breakdown_timer:
340 self._log_tick_msg('New tick')
341 system_utils.DroneCache.refresh()
342
343 with breakdown_timer.Step('garbage_collection'):
344 self._garbage_collection()
345 with breakdown_timer.Step('trigger_refresh'):
346 self._log_tick_msg('Starting _drone_manager.trigger_refresh')
347 _drone_manager.trigger_refresh()
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800348 with breakdown_timer.Step('schedule_running_host_queue_entries'):
349 self._schedule_running_host_queue_entries()
350 with breakdown_timer.Step('schedule_special_tasks'):
351 self._schedule_special_tasks()
352 with breakdown_timer.Step('schedule_new_jobs'):
353 self._schedule_new_jobs()
354 with breakdown_timer.Step('sync_refresh'):
355 self._log_tick_msg('Starting _drone_manager.sync_refresh')
356 _drone_manager.sync_refresh()
357 # _run_cleanup must be called between drone_manager.sync_refresh,
358 # and drone_manager.execute_actions, as sync_refresh will clear the
359 # calls queued in drones. Therefore, any action that calls
360 # drone.queue_call to add calls to the drone._calls, should be after
361 # drone refresh is completed and before
362 # drone_manager.execute_actions at the end of the tick.
363 with breakdown_timer.Step('run_cleanup'):
364 self._run_cleanup()
365 with breakdown_timer.Step('find_aborting'):
366 self._find_aborting()
367 with breakdown_timer.Step('find_aborted_special_tasks'):
368 self._find_aborted_special_tasks()
369 with breakdown_timer.Step('handle_agents'):
370 self._handle_agents()
371 with breakdown_timer.Step('host_scheduler_tick'):
372 self._log_tick_msg('Starting _host_scheduler.tick')
373 self._host_scheduler.tick()
374 with breakdown_timer.Step('drones_execute_actions'):
375 self._log_tick_msg('Starting _drone_manager.execute_actions')
376 _drone_manager.execute_actions()
377 with breakdown_timer.Step('send_queued_emails'):
378 self._log_tick_msg(
379 'Starting email_manager.manager.send_queued_emails')
380 email_manager.manager.send_queued_emails()
381 with breakdown_timer.Step('db_reset_queries'):
382 self._log_tick_msg('Starting django.db.reset_queries')
383 django.db.reset_queries()
384
385 self._tick_count += 1
386 metrics.Counter('chromeos/autotest/scheduler/tick').increment()
mbligh36768f02008-02-22 18:28:33 +0000387
showard97aed502008-11-04 02:01:24 +0000388
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800389 @_calls_log_tick_msg
mblighf3294cc2009-04-08 21:17:38 +0000390 def _run_cleanup(self):
391 self._periodic_cleanup.run_cleanup_maybe()
392 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000393
mbligh36768f02008-02-22 18:28:33 +0000394
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800395 @_calls_log_tick_msg
showardf13a9e22009-12-18 22:54:09 +0000396 def _garbage_collection(self):
397 threshold_time = time.time() - self._seconds_between_garbage_stats
398 if threshold_time < self._last_garbage_stats_time:
399 # Don't generate these reports very often.
400 return
401
402 self._last_garbage_stats_time = time.time()
403 # Force a full level 0 collection (because we can, it doesn't hurt
404 # at this interval).
405 gc.collect()
406 logging.info('Logging garbage collector stats on tick %d.',
407 self._tick_count)
408 gc_stats._log_garbage_collector_stats()
409
410
showard170873e2009-01-07 00:22:26 +0000411 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
412 for object_id in object_ids:
413 agent_dict.setdefault(object_id, set()).add(agent)
414
415
416 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
417 for object_id in object_ids:
418 assert object_id in agent_dict
419 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700420 # If an ID has no more active agent associated, there is no need to
421 # keep it in the dictionary. Otherwise, scheduler will keep an
422 # unnecessarily big dictionary until being restarted.
423 if not agent_dict[object_id]:
424 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000425
426
showardd1195652009-12-08 22:21:02 +0000427 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700428 """
429 Creates and adds an agent to the dispatchers list.
430
431 In creating the agent we also pass on all the queue_entry_ids and
432 host_ids from the special agent task. For every agent we create, we
433 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
434 against the host_ids given to it. So theoritically, a host can have any
435 number of agents associated with it, and each of them can have any
436 special agent task, though in practice we never see > 1 agent/task per
437 host at any time.
438
439 @param agent_task: A SpecialTask for the agent to manage.
440 """
showardd1195652009-12-08 22:21:02 +0000441 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000442 self._agents.append(agent)
443 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000444 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
445 self._register_agent_for_ids(self._queue_entry_agents,
446 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000447
showard170873e2009-01-07 00:22:26 +0000448
449 def get_agents_for_entry(self, queue_entry):
450 """
451 Find agents corresponding to the specified queue_entry.
452 """
showardd3dc1992009-04-22 21:01:40 +0000453 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000454
455
456 def host_has_agent(self, host):
457 """
458 Determine if there is currently an Agent present using this host.
459 """
460 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000461
462
jadmanski0afbb632008-06-06 21:10:57 +0000463 def remove_agent(self, agent):
464 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000465 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
466 agent)
467 self._unregister_agent_for_ids(self._queue_entry_agents,
468 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000469
470
showard8cc058f2009-09-08 16:26:33 +0000471 def _host_has_scheduled_special_task(self, host):
472 return bool(models.SpecialTask.objects.filter(host__id=host.id,
473 is_active=False,
474 is_complete=False))
475
476
jadmanski0afbb632008-06-06 21:10:57 +0000477 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000478 agent_tasks = self._create_recovery_agent_tasks()
479 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000480 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000481 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000482 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000483 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000484 self._reverify_remaining_hosts()
485 # reinitialize drones after killing orphaned processes, since they can
486 # leave around files when they die
487 _drone_manager.execute_actions()
488 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000489
showard170873e2009-01-07 00:22:26 +0000490
showardd1195652009-12-08 22:21:02 +0000491 def _create_recovery_agent_tasks(self):
492 return (self._get_queue_entry_agent_tasks()
493 + self._get_special_task_agent_tasks(is_active=True))
494
495
496 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700497 """
498 Get agent tasks for all hqe in the specified states.
499
500 Loosely this translates to taking a hqe in one of the specified states,
501 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
502 through _get_agent_task_for_queue_entry. Each queue entry can only have
503 one agent task at a time, but there might be multiple queue entries in
504 the group.
505
506 @return: A list of AgentTasks.
507 """
showardd1195652009-12-08 22:21:02 +0000508 # host queue entry statuses handled directly by AgentTasks (Verifying is
509 # handled through SpecialTasks, so is not listed here)
510 statuses = (models.HostQueueEntry.Status.STARTING,
511 models.HostQueueEntry.Status.RUNNING,
512 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000513 models.HostQueueEntry.Status.PARSING,
514 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000515 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000516 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000517 where='status IN (%s)' % status_list)
518
519 agent_tasks = []
520 used_queue_entries = set()
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800521 hqe_count_by_status = {}
showardd1195652009-12-08 22:21:02 +0000522 for entry in queue_entries:
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800523 hqe_count_by_status[entry.status] = (
524 hqe_count_by_status.get(entry.status, 0) + 1)
showardd1195652009-12-08 22:21:02 +0000525 if self.get_agents_for_entry(entry):
526 # already being handled
527 continue
528 if entry in used_queue_entries:
529 # already picked up by a synchronous job
530 continue
531 agent_task = self._get_agent_task_for_queue_entry(entry)
532 agent_tasks.append(agent_task)
533 used_queue_entries.update(agent_task.queue_entries)
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800534
535 for status, count in hqe_count_by_status.iteritems():
536 metrics.Gauge(
537 'chromeos/autotest/scheduler/active_host_queue_entries'
538 ).set(count, fields={'status': status})
539
showardd1195652009-12-08 22:21:02 +0000540 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000541
542
showardd1195652009-12-08 22:21:02 +0000543 def _get_special_task_agent_tasks(self, is_active=False):
544 special_tasks = models.SpecialTask.objects.filter(
545 is_active=is_active, is_complete=False)
546 return [self._get_agent_task_for_special_task(task)
547 for task in special_tasks]
548
549
550 def _get_agent_task_for_queue_entry(self, queue_entry):
551 """
beeps8bb1f7d2013-08-05 01:30:09 -0700552 Construct an AgentTask instance for the given active HostQueueEntry.
553
showardd1195652009-12-08 22:21:02 +0000554 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700555 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000556 """
557 task_entries = queue_entry.job.get_group_entries(queue_entry)
558 self._check_for_duplicate_host_entries(task_entries)
559
560 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
561 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000562 if queue_entry.is_hostless():
563 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000564 return QueueTask(queue_entries=task_entries)
565 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700566 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000567 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700568 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000569 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700570 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000571
Prashanth B0e960282014-05-13 19:38:28 -0700572 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800573 '_get_agent_task_for_queue_entry got entry with '
574 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000575
576
577 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000578 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
579 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000580 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000581 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000582 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000583 if using_host:
showardd1195652009-12-08 22:21:02 +0000584 self._assert_host_has_no_agent(task_entry)
585
586
587 def _assert_host_has_no_agent(self, entry):
588 """
589 @param entry: a HostQueueEntry or a SpecialTask
590 """
591 if self.host_has_agent(entry.host):
592 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700593 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000594 'While scheduling %s, host %s already has a host agent %s'
595 % (entry, entry.host, agent.task))
596
597
598 def _get_agent_task_for_special_task(self, special_task):
599 """
600 Construct an AgentTask class to run the given SpecialTask and add it
601 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700602
MK Ryu35d661e2014-09-25 17:44:10 -0700603 A special task is created through schedule_special_tasks, but only if
beeps8bb1f7d2013-08-05 01:30:09 -0700604 the host doesn't already have an agent. This happens through
605 add_agent_task. All special agent tasks are given a host on creation,
606 and a Null hqe. To create a SpecialAgentTask object, you need a
607 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
608 object contains a hqe it's passed on to the special agent task, which
609 creates a HostQueueEntry and saves it as it's queue_entry.
610
showardd1195652009-12-08 22:21:02 +0000611 @param special_task: a models.SpecialTask instance
612 @returns an AgentTask to run this SpecialTask
613 """
614 self._assert_host_has_no_agent(special_task)
615
beeps5e2bb4a2013-10-28 11:26:45 -0700616 special_agent_task_classes = (prejob_task.CleanupTask,
617 prejob_task.VerifyTask,
618 prejob_task.RepairTask,
619 prejob_task.ResetTask,
620 prejob_task.ProvisionTask)
621
showardd1195652009-12-08 22:21:02 +0000622 for agent_task_class in special_agent_task_classes:
623 if agent_task_class.TASK_TYPE == special_task.task:
624 return agent_task_class(task=special_task)
625
Prashanth B0e960282014-05-13 19:38:28 -0700626 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800627 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000628
629
630 def _register_pidfiles(self, agent_tasks):
631 for agent_task in agent_tasks:
632 agent_task.register_necessary_pidfiles()
633
634
635 def _recover_tasks(self, agent_tasks):
636 orphans = _drone_manager.get_orphaned_autoserv_processes()
637
638 for agent_task in agent_tasks:
639 agent_task.recover()
640 if agent_task.monitor and agent_task.monitor.has_process():
641 orphans.discard(agent_task.monitor.get_process())
642 self.add_agent_task(agent_task)
643
644 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000645
646
showard8cc058f2009-09-08 16:26:33 +0000647 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000648 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
649 % status):
showard0db3d432009-10-12 20:29:15 +0000650 if entry.status == status and not self.get_agents_for_entry(entry):
651 # The status can change during iteration, e.g., if job.run()
652 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000653 yield entry
654
655
showard6878e8b2009-07-20 22:37:45 +0000656 def _check_for_remaining_orphan_processes(self, orphans):
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800657 m = 'chromeos/autotest/errors/unrecovered_orphan_processes'
658 metrics.Gauge(m).set(len(orphans))
659
showard6878e8b2009-07-20 22:37:45 +0000660 if not orphans:
661 return
662 subject = 'Unrecovered orphan autoserv processes remain'
663 message = '\n'.join(str(process) for process in orphans)
mbligh5fa9e112009-08-03 16:46:06 +0000664 die_on_orphans = global_config.global_config.get_config_value(
665 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
666
667 if die_on_orphans:
668 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000669
showard170873e2009-01-07 00:22:26 +0000670
showard8cc058f2009-09-08 16:26:33 +0000671 def _recover_pending_entries(self):
672 for entry in self._get_unassigned_entries(
673 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000674 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000675 entry.on_pending()
676
677
showardb8900452009-10-12 20:31:01 +0000678 def _check_for_unrecovered_verifying_entries(self):
Allen Li7bea6a02017-02-06 17:01:20 -0800679 # Verify is replaced by Reset.
jamesrenc44ae992010-02-19 00:12:54 +0000680 queue_entries = scheduler_models.HostQueueEntry.fetch(
Allen Li7bea6a02017-02-06 17:01:20 -0800681 where='status = "%s"' % models.HostQueueEntry.Status.RESETTING)
showardb8900452009-10-12 20:31:01 +0000682 for queue_entry in queue_entries:
683 special_tasks = models.SpecialTask.objects.filter(
684 task__in=(models.SpecialTask.Task.CLEANUP,
Allen Li7bea6a02017-02-06 17:01:20 -0800685 models.SpecialTask.Task.VERIFY,
686 models.SpecialTask.Task.RESET),
showardb8900452009-10-12 20:31:01 +0000687 queue_entry__id=queue_entry.id,
688 is_complete=False)
689 if special_tasks.count() == 0:
Allen Li7bea6a02017-02-06 17:01:20 -0800690 logging.error('Unrecovered Resetting host queue entry: %s. '
691 'Setting status to Queued.', str(queue_entry))
692 # Essentially this host queue entry was set to be Verifying
693 # however no special task exists for entry. This occurs if the
694 # scheduler dies between changing the status and creating the
695 # special task. By setting it to queued, the job can restart
696 # from the beginning and proceed correctly. This is much more
697 # preferable than having monitor_db not launching.
698 queue_entry.set_status('Queued')
showard170873e2009-01-07 00:22:26 +0000699
700
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800701 @_calls_log_tick_msg
showard65db3932009-10-28 19:54:35 +0000702 def _schedule_special_tasks(self):
703 """
704 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700705
706 Special tasks include PreJobTasks like verify, reset and cleanup.
707 They are created through _schedule_new_jobs and associated with a hqe
708 This method translates SpecialTasks to the appropriate AgentTask and
709 adds them to the dispatchers agents list, so _handle_agents can execute
710 them.
showard65db3932009-10-28 19:54:35 +0000711 """
Prashanth B4ec98672014-05-15 10:44:54 -0700712 # When the host scheduler is responsible for acquisition we only want
713 # to run tasks with leased hosts. All hqe tasks will already have
714 # leased hosts, and we don't want to run frontend tasks till the host
715 # scheduler has vetted the assignment. Note that this doesn't include
716 # frontend tasks with hosts leased by other active hqes.
717 for task in self._job_query_manager.get_prioritized_special_tasks(
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800718 only_tasks_with_leased_hosts=not self._inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000719 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000720 continue
showardd1195652009-12-08 22:21:02 +0000721 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000722
723
showard170873e2009-01-07 00:22:26 +0000724 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000725 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000726 # should never happen
showarded2afea2009-07-07 20:54:07 +0000727 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000728 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000729 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700730 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000731 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000732
733
Allen Li7bea6a02017-02-06 17:01:20 -0800734 DEFAULT_REQUESTED_BY_USER_ID = 1
735
736
jadmanski0afbb632008-06-06 21:10:57 +0000737 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000738 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700739 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000740 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000741 if self.host_has_agent(host):
742 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000743 continue
showard8cc058f2009-09-08 16:26:33 +0000744 if self._host_has_scheduled_special_task(host):
Allen Li7bea6a02017-02-06 17:01:20 -0800745 # host will have a special task scheduled on the next cycle
showard8cc058f2009-09-08 16:26:33 +0000746 continue
showard170873e2009-01-07 00:22:26 +0000747 if print_message:
Allen Li7bea6a02017-02-06 17:01:20 -0800748 logging.error(print_message, host.hostname)
749 try:
750 user = models.User.objects.get(login='autotest_system')
751 except models.User.DoesNotExist:
752 user = models.User.objects.get(
753 id=self.DEFAULT_REQUESTED_BY_USER_ID)
showard8cc058f2009-09-08 16:26:33 +0000754 models.SpecialTask.objects.create(
Allen Li7bea6a02017-02-06 17:01:20 -0800755 task=models.SpecialTask.Task.RESET,
756 host=models.Host.objects.get(id=host.id),
757 requested_by=user)
mbligh36768f02008-02-22 18:28:33 +0000758
759
jadmanski0afbb632008-06-06 21:10:57 +0000760 def _recover_hosts(self):
761 # recover "Repair Failed" hosts
762 message = 'Reverifying dead host %s'
763 self._reverify_hosts_where("status = 'Repair Failed'",
764 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000765
766
showard89f84db2009-03-12 20:39:13 +0000767 def _refresh_pending_queue_entries(self):
768 """
769 Lookup the pending HostQueueEntries and call our HostScheduler
770 refresh() method given that list. Return the list.
771
772 @returns A list of pending HostQueueEntries sorted in priority order.
773 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700774 queue_entries = self._job_query_manager.get_pending_queue_entries(
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800775 only_hostless=not self._inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000776 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000777 return []
showard89f84db2009-03-12 20:39:13 +0000778 return queue_entries
779
780
showarda9545c02009-12-18 22:44:26 +0000781 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800782 """Schedule a hostless (suite) job.
783
784 @param queue_entry: The queue_entry representing the hostless job.
785 """
showarda9545c02009-12-18 22:44:26 +0000786 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700787
788 # Need to set execution_subdir before setting the status:
789 # After a restart of the scheduler, agents will be restored for HQEs in
790 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
791 # execution_subdir is needed. Therefore it must be set before entering
792 # one of these states.
793 # Otherwise, if the scheduler was interrupted between setting the status
794 # and the execution_subdir, upon it's restart restoring agents would
795 # fail.
796 # Is there a way to get a status in one of these states without going
797 # through this code? Following cases are possible:
798 # - If it's aborted before being started:
799 # active bit will be 0, so there's nothing to parse, it will just be
800 # set to completed by _find_aborting. Critical statuses are skipped.
801 # - If it's aborted or it fails after being started:
802 # It was started, so this code was executed.
803 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000804 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000805
806
beepscc9fc702013-12-02 12:45:38 -0800807 def _schedule_host_job(self, host, queue_entry):
808 """Schedules a job on the given host.
809
810 1. Assign the host to the hqe, if it isn't already assigned.
811 2. Create a SpecialAgentTask for the hqe.
812 3. Activate the hqe.
813
814 @param queue_entry: The job to schedule.
815 @param host: The host to schedule the job on.
816 """
817 if self.host_has_agent(host):
818 host_agent_task = list(self._host_agents.get(host.id))[0].task
beepscc9fc702013-12-02 12:45:38 -0800819 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700820 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800821
822
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800823 @_calls_log_tick_msg
showard89f84db2009-03-12 20:39:13 +0000824 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700825 """
826 Find any new HQEs and call schedule_pre_job_tasks for it.
827
828 This involves setting the status of the HQE and creating a row in the
829 db corresponding the the special task, through
830 scheduler_models._queue_special_task. The new db row is then added as
831 an agent to the dispatcher through _schedule_special_tasks and
832 scheduled for execution on the drone through _handle_agents.
833 """
showard89f84db2009-03-12 20:39:13 +0000834 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000835
beepscc9fc702013-12-02 12:45:38 -0800836 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700837 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700838 new_jobs_with_hosts = 0
839 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800840 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700841 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000842
beepscc9fc702013-12-02 12:45:38 -0800843 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000844 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000845 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700846 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000847 else:
beepscc9fc702013-12-02 12:45:38 -0800848 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700849 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700850
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800851 metrics.Counter(
852 'chromeos/autotest/scheduler/scheduled_jobs_hostless'
853 ).increment_by(new_hostless_jobs)
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800854
beepscc9fc702013-12-02 12:45:38 -0800855 if not host_jobs:
856 return
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800857
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800858 if not self._inline_host_acquisition:
Prathmesh Prabhu3c6a3bf2016-12-20 11:29:02 -0800859 # In this case, host_scheduler is responsible for scheduling
860 # host_jobs. Scheduling the jobs ourselves can lead to DB corruption
861 # since host_scheduler assumes it is the single process scheduling
862 # host jobs.
863 metrics.Gauge(
864 'chromeos/autotest/errors/scheduler/unexpected_host_jobs').set(
865 len(host_jobs))
866 return
867
Prashanth Bf66d51b2014-05-06 12:42:25 -0700868 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
869 for host_assignment in jobs_with_hosts:
870 self._schedule_host_job(host_assignment.host, host_assignment.job)
871 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800872
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800873 metrics.Counter(
874 'chromeos/autotest/scheduler/scheduled_jobs_with_hosts'
875 ).increment_by(new_jobs_with_hosts)
876 # TODO(pprabhu): Decide what to do about this metric. Million dollar
877 # question: What happens to jobs that were not matched. Do they stay in
878 # the queue, and get processed right here in the next tick (then we want
879 # a guage corresponding to the number of outstanding unmatched host
880 # jobs), or are they handled somewhere else (then we need a counter
881 # corresponding to failed_to_match_with_hosts jobs).
882 #autotest_stats.Gauge(key).send('new_jobs_without_hosts',
883 # new_jobs_need_hosts -
884 # new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000885
886
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800887 @_calls_log_tick_msg
showard8cc058f2009-09-08 16:26:33 +0000888 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700889 """
890 Adds agents to the dispatcher.
891
892 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
893 QueueTask for example, will have a job with a control file, and
894 the agent will have methods that poll, abort and check if the queue
895 task is finished. The dispatcher runs the agent_task, as well as
896 other agents in it's _agents member, through _handle_agents, by
897 calling the Agents tick().
898
899 This method creates an agent for each HQE in one of (starting, running,
900 gathering, parsing, archiving) states, and adds it to the dispatcher so
901 it is handled by _handle_agents.
902 """
showardd1195652009-12-08 22:21:02 +0000903 for agent_task in self._get_queue_entry_agent_tasks():
904 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000905
906
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800907 @_calls_log_tick_msg
jadmanski0afbb632008-06-06 21:10:57 +0000908 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700909 """
910 Looks through the afe_host_queue_entries for an aborted entry.
911
912 The aborted bit is set on an HQE in many ways, the most common
913 being when a user requests an abort through the frontend, which
914 results in an rpc from the afe to abort_host_queue_entries.
915 """
jamesrene7c65cb2010-06-08 20:38:10 +0000916 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000917 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700918 where='aborted=1 and complete=0'):
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800919
920 # If the job is running on a shard, let the shard handle aborting
921 # it and sync back the right status.
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800922 if entry.job.shard_id is not None and not server_utils.is_shard():
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800923 logging.info('Waiting for shard %s to abort hqe %s',
924 entry.job.shard_id, entry)
925 continue
926
showardf4a2e502009-07-28 20:06:39 +0000927 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800928
929 # The task would have started off with both is_complete and
930 # is_active = False. Aborted tasks are neither active nor complete.
931 # For all currently active tasks this will happen through the agent,
932 # but we need to manually update the special tasks that haven't
933 # started yet, because they don't have agents.
934 models.SpecialTask.objects.filter(is_active=False,
935 queue_entry_id=entry.id).update(is_complete=True)
936
showardd3dc1992009-04-22 21:01:40 +0000937 for agent in self.get_agents_for_entry(entry):
938 agent.abort()
939 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000940 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700941 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000942 for job in jobs_to_stop:
943 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000944
945
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800946 @_calls_log_tick_msg
beeps8bb1f7d2013-08-05 01:30:09 -0700947 def _find_aborted_special_tasks(self):
948 """
949 Find SpecialTasks that have been marked for abortion.
950
951 Poll the database looking for SpecialTasks that are active
952 and have been marked for abortion, then abort them.
953 """
954
955 # The completed and active bits are very important when it comes
956 # to scheduler correctness. The active bit is set through the prolog
957 # of a special task, and reset through the cleanup method of the
958 # SpecialAgentTask. The cleanup is called both through the abort and
959 # epilog. The complete bit is set in several places, and in general
960 # a hanging job will have is_active=1 is_complete=0, while a special
961 # task which completed will have is_active=0 is_complete=1. To check
962 # aborts we directly check active because the complete bit is set in
963 # several places, including the epilog of agent tasks.
964 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
965 is_aborted=True)
966 for task in aborted_tasks:
967 # There are 2 ways to get the agent associated with a task,
968 # through the host and through the hqe. A special task
969 # always needs a host, but doesn't always need a hqe.
970 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700971 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000972
beeps8bb1f7d2013-08-05 01:30:09 -0700973 # The epilog preforms critical actions such as
974 # queueing the next SpecialTask, requeuing the
975 # hqe etc, however it doesn't actually kill the
976 # monitor process and set the 'done' bit. Epilogs
977 # assume that the job failed, and that the monitor
978 # process has already written an exit code. The
979 # done bit is a necessary condition for
980 # _handle_agents to schedule any more special
981 # tasks against the host, and it must be set
982 # in addition to is_active, is_complete and success.
983 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000984 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700985
986
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700987 def _can_start_agent(self, agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000988 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000989 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000990 return True
991 # don't allow any nonzero-process agents to run after we've reached a
992 # limit (this avoids starvation of many-process agents)
993 if have_reached_limit:
994 return False
995 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000996 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000997 agent.task.owner_username,
998 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000999 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001000 return False
showard4c5374f2008-09-04 17:02:56 +00001001 return True
1002
1003
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -08001004 @_calls_log_tick_msg
jadmanski0afbb632008-06-06 21:10:57 +00001005 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -07001006 """
1007 Handles agents of the dispatcher.
1008
1009 Appropriate Agents are added to the dispatcher through
1010 _schedule_running_host_queue_entries. These agents each
1011 have a task. This method runs the agents task through
1012 agent.tick() leading to:
1013 agent.start
1014 prolog -> AgentTasks prolog
1015 For each queue entry:
1016 sets host status/status to Running
1017 set started_on in afe_host_queue_entries
1018 run -> AgentTasks run
1019 Creates PidfileRunMonitor
1020 Queues the autoserv command line for this AgentTask
1021 via the drone manager. These commands are executed
1022 through the drone managers execute actions.
1023 poll -> AgentTasks/BaseAgentTask poll
1024 checks the monitors exit_code.
1025 Executes epilog if task is finished.
1026 Executes AgentTasks _finish_task
1027 finish_task is usually responsible for setting the status
1028 of the HQE/host, and updating it's active and complete fileds.
1029
1030 agent.is_done
1031 Removed the agent from the dispatchers _agents queue.
1032 Is_done checks the finished bit on the agent, that is
1033 set based on the Agents task. During the agents poll
1034 we check to see if the monitor process has exited in
1035 it's finish method, and set the success member of the
1036 task based on this exit code.
1037 """
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001038 num_started_this_tick = 0
1039 num_finished_this_tick = 0
showard4c5374f2008-09-04 17:02:56 +00001040 have_reached_limit = False
1041 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -07001042 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +00001043 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001044 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1045 'queue_entry ids:%s' % (agent.host_ids,
1046 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001047 if not agent.started:
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001048 if not self._can_start_agent(agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001049 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001050 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001051 continue
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001052 num_started_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001053 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001054 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001055 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001056 if agent.is_done():
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001057 num_finished_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001058 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001059 self.remove_agent(agent)
Prathmesh Prabhued7ece92016-11-23 11:19:43 -08001060
1061 metrics.Counter(
1062 'chromeos/autotest/scheduler/agent_processes_started'
1063 ).increment_by(num_started_this_tick)
1064 metrics.Counter(
1065 'chromeos/autotest/scheduler/agent_processes_finished'
1066 ).increment_by(num_finished_this_tick)
1067 num_agent_processes = _drone_manager.total_running_processes()
1068 metrics.Gauge(
1069 'chromeos/autotest/scheduler/agent_processes'
1070 ).set(num_agent_processes)
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001071 logging.info('%d running processes. %d added this tick.',
Prathmesh Prabhued7ece92016-11-23 11:19:43 -08001072 num_agent_processes, num_started_this_tick)
mbligh36768f02008-02-22 18:28:33 +00001073
1074
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -08001075 def _log_tick_msg(self, msg):
1076 if self._tick_debug:
1077 logging.debug(msg)
1078
1079
1080 def _log_extra_msg(self, msg):
1081 if self._extra_debugging:
1082 logging.debug(msg)
1083
1084
mbligh36768f02008-02-22 18:28:33 +00001085class Agent(object):
showard77182562009-06-10 00:16:05 +00001086 """
Alex Miller47715eb2013-07-24 03:34:01 -07001087 An agent for use by the Dispatcher class to perform a task. An agent wraps
1088 around an AgentTask mainly to associate the AgentTask with the queue_entry
1089 and host ids.
showard77182562009-06-10 00:16:05 +00001090
1091 The following methods are required on all task objects:
1092 poll() - Called periodically to let the task check its status and
1093 update its internal state. If the task succeeded.
1094 is_done() - Returns True if the task is finished.
1095 abort() - Called when an abort has been requested. The task must
1096 set its aborted attribute to True if it actually aborted.
1097
1098 The following attributes are required on all task objects:
1099 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001100 success - bool, True if this task succeeded.
1101 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1102 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001103 """
1104
1105
showard418785b2009-11-23 20:19:59 +00001106 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001107 """
Alex Miller47715eb2013-07-24 03:34:01 -07001108 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001109 """
showard8cc058f2009-09-08 16:26:33 +00001110 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001111
showard77182562009-06-10 00:16:05 +00001112 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001113 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001114
showard8cc058f2009-09-08 16:26:33 +00001115 self.queue_entry_ids = task.queue_entry_ids
1116 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001117
showard8cc058f2009-09-08 16:26:33 +00001118 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001119 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001120
1121
jadmanski0afbb632008-06-06 21:10:57 +00001122 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001123 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001124 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001125 self.task.poll()
1126 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001127 self.finished = True
showardec113162008-05-08 00:52:49 +00001128
1129
jadmanski0afbb632008-06-06 21:10:57 +00001130 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001131 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001132
1133
showardd3dc1992009-04-22 21:01:40 +00001134 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001135 if self.task:
1136 self.task.abort()
1137 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001138 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001139 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001140
showardd3dc1992009-04-22 21:01:40 +00001141
beeps5e2bb4a2013-10-28 11:26:45 -07001142class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001143 """
1144 Common functionality for QueueTask and HostlessQueueTask
1145 """
1146 def __init__(self, queue_entries):
1147 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001148 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001149 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001150
1151
showard73ec0442009-02-07 02:05:20 +00001152 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001153 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001154
1155
jamesrenc44ae992010-02-19 00:12:54 +00001156 def _write_control_file(self, execution_path):
1157 control_path = _drone_manager.attach_file_to_execution(
1158 execution_path, self.job.control_file)
1159 return control_path
1160
1161
Aviv Keshet308e7362013-05-21 14:43:16 -07001162 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001163 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001164 execution_path = self.queue_entries[0].execution_path()
1165 control_path = self._write_control_file(execution_path)
1166 hostnames = ','.join(entry.host.hostname
1167 for entry in self.queue_entries
1168 if not entry.is_hostless())
1169
1170 execution_tag = self.queue_entries[0].execution_tag()
1171 params = _autoserv_command_line(
1172 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001173 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001174 _drone_manager.absolute_path(control_path)],
1175 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001176 if self.job.is_image_update_job():
1177 params += ['--image', self.job.update_image_path]
1178
jamesrenc44ae992010-02-19 00:12:54 +00001179 return params
showardd1195652009-12-08 22:21:02 +00001180
1181
1182 @property
1183 def num_processes(self):
1184 return len(self.queue_entries)
1185
1186
1187 @property
1188 def owner_username(self):
1189 return self.job.owner
1190
1191
1192 def _working_directory(self):
1193 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001194
1195
jadmanski0afbb632008-06-06 21:10:57 +00001196 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001197 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001198 keyval_dict = self.job.keyval_dict()
1199 keyval_dict[queued_key] = queued_time
showardf1ae3542009-05-11 19:26:02 +00001200 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001201 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001202 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001203 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001204
1205
showard35162b02009-03-03 02:17:30 +00001206 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001207 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001208 _drone_manager.write_lines_to_file(error_file_path,
1209 [_LOST_PROCESS_ERROR])
1210
1211
showardd3dc1992009-04-22 21:01:40 +00001212 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001213 if not self.monitor:
1214 return
1215
showardd9205182009-04-27 20:09:55 +00001216 self._write_job_finished()
1217
showard35162b02009-03-03 02:17:30 +00001218 if self.monitor.lost_process:
1219 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001220
jadmanskif7fa2cc2008-10-01 14:13:23 +00001221
showardcbd74612008-11-19 21:42:02 +00001222 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001223 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001224 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001225 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001226 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001227
1228
jadmanskif7fa2cc2008-10-01 14:13:23 +00001229 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001230 if not self.monitor or not self.monitor.has_process():
1231 return
1232
jadmanskif7fa2cc2008-10-01 14:13:23 +00001233 # build up sets of all the aborted_by and aborted_on values
1234 aborted_by, aborted_on = set(), set()
1235 for queue_entry in self.queue_entries:
1236 if queue_entry.aborted_by:
1237 aborted_by.add(queue_entry.aborted_by)
1238 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1239 aborted_on.add(t)
1240
1241 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001242 # TODO(showard): this conditional is now obsolete, we just need to leave
1243 # it in temporarily for backwards compatibility over upgrades. delete
1244 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001245 assert len(aborted_by) <= 1
1246 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001247 aborted_by_value = aborted_by.pop()
1248 aborted_on_value = max(aborted_on)
1249 else:
1250 aborted_by_value = 'autotest_system'
1251 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001252
showarda0382352009-02-11 23:36:43 +00001253 self._write_keyval_after_job("aborted_by", aborted_by_value)
1254 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001255
showardcbd74612008-11-19 21:42:02 +00001256 aborted_on_string = str(datetime.datetime.fromtimestamp(
1257 aborted_on_value))
1258 self._write_status_comment('Job aborted by %s on %s' %
1259 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001260
1261
jadmanski0afbb632008-06-06 21:10:57 +00001262 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001263 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001264 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001265 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001266
1267
jadmanski0afbb632008-06-06 21:10:57 +00001268 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001269 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001270 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001271
1272
1273class QueueTask(AbstractQueueTask):
1274 def __init__(self, queue_entries):
1275 super(QueueTask, self).__init__(queue_entries)
1276 self._set_ids(queue_entries=queue_entries)
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -08001277 self._enable_ssp_container = (
1278 global_config.global_config.get_config_value(
1279 'AUTOSERV', 'enable_ssp_container', type=bool,
1280 default=True))
showarda9545c02009-12-18 22:44:26 +00001281
1282
1283 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001284 self._check_queue_entry_statuses(
1285 self.queue_entries,
1286 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1287 models.HostQueueEntry.Status.RUNNING),
1288 allowed_host_statuses=(models.Host.Status.PENDING,
1289 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001290
1291 super(QueueTask, self).prolog()
1292
1293 for queue_entry in self.queue_entries:
1294 self._write_host_keyvals(queue_entry.host)
1295 queue_entry.host.set_status(models.Host.Status.RUNNING)
1296 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001297
1298
1299 def _finish_task(self):
1300 super(QueueTask, self)._finish_task()
1301
1302 for queue_entry in self.queue_entries:
1303 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001304 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001305
1306
Alex Miller9f01d5d2013-08-08 02:26:01 -07001307 def _command_line(self):
Dan Shi36cfd832014-10-10 13:38:51 -07001308 invocation = super(QueueTask, self)._command_line()
1309 # Check if server-side packaging is needed.
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -08001310 if (self._enable_ssp_container and
Dan Shi36cfd832014-10-10 13:38:51 -07001311 self.job.control_type == control_data.CONTROL_TYPE.SERVER and
1312 self.job.require_ssp != False):
Dan Shiec1d47d2015-02-13 11:38:13 -08001313 invocation += ['--require-ssp']
Dan Shi36cfd832014-10-10 13:38:51 -07001314 keyval_dict = self.job.keyval_dict()
1315 test_source_build = keyval_dict.get('test_source_build', None)
1316 if test_source_build:
1317 invocation += ['--test_source_build', test_source_build]
Dan Shi70647ca2015-07-16 22:52:35 -07001318 if self.job.parent_job_id:
1319 invocation += ['--parent_job_id', str(self.job.parent_job_id)]
Dan Shi36cfd832014-10-10 13:38:51 -07001320 return invocation + ['--verify_job_repo_url']
Alex Miller9f01d5d2013-08-08 02:26:01 -07001321
1322
Dan Shi1a189052013-10-28 14:41:35 -07001323class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001324 def __init__(self, queue_entry):
1325 super(HostlessQueueTask, self).__init__([queue_entry])
1326 self.queue_entry_ids = [queue_entry.id]
1327
1328
1329 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001330 super(HostlessQueueTask, self).prolog()
1331
1332
mbligh4608b002010-01-05 18:22:35 +00001333 def _finish_task(self):
1334 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001335
1336 # When a job is added to database, its initial status is always
1337 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1338 # status, check if any of them can be started. If scheduler hits some
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001339 # limit, e.g., max_hostless_jobs_per_drone, scheduler will
1340 # leave these jobs in Starting status. Otherwise, the jobs'
1341 # status will be changed to Running, and an autoserv process
1342 # will be started in drone for each of these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001343 # If the entry is still in status Starting, the process has not started
1344 # yet. Therefore, there is no need to parse and collect log. Without
1345 # this check, exception will be raised by scheduler as execution_subdir
1346 # for this queue entry does not have a value yet.
1347 hqe = self.queue_entries[0]
1348 if hqe.status != models.HostQueueEntry.Status.STARTING:
1349 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001350
1351
mbligh36768f02008-02-22 18:28:33 +00001352if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001353 main()