blob: 7a8fad13219c67aeb26d0b65cefba8733715ce8d [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
Richard Barnetteffed1722016-05-18 15:57:22 -07002
3#pylint: disable=C0111
4
mbligh36768f02008-02-22 18:28:33 +00005"""
6Autotest scheduler
7"""
showard909c7a62008-07-15 21:52:38 +00008
Dan Shif6c65bd2014-08-29 16:15:07 -07009import datetime
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -080010import functools
Dan Shif6c65bd2014-08-29 16:15:07 -070011import gc
12import logging
13import optparse
14import os
15import signal
16import sys
17import time
showard402934a2009-12-21 22:20:47 +000018
Alex Miller05d7b4c2013-03-04 07:49:38 -080019import common
showard21baa452008-10-21 00:08:39 +000020from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000021
22import django.db
23
Dan Shiec1d47d2015-02-13 11:38:13 -080024from autotest_lib.client.common_lib import control_data
Prashanth B0e960282014-05-13 19:38:28 -070025from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070026from autotest_lib.client.common_lib import utils
xixuan4de4e742017-01-30 13:31:35 -080027from autotest_lib.frontend.afe import models
Fang Dengc330bee2014-10-21 18:10:55 -070028from autotest_lib.scheduler import agent_task, drone_manager
beeps5e2bb4a2013-10-28 11:26:45 -070029from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
30from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070031from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070032from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070033from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000034from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080035from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070036from autotest_lib.server import autoserv_utils
Dan Shi114e1722016-01-10 18:12:53 -080037from autotest_lib.server import system_utils
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080038from autotest_lib.server import utils as server_utils
Dan Shicf2e8dd2015-05-07 17:18:48 -070039from autotest_lib.site_utils import metadata_reporter
Dan Shib9144a42014-12-01 16:09:32 -080040from autotest_lib.site_utils import server_manager_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080041
Dan Shi5e2efb72017-02-07 11:40:23 -080042try:
43 from chromite.lib import metrics
44 from chromite.lib import ts_mon_config
45except ImportError:
46 metrics = utils.metrics_mock
47 ts_mon_config = utils.metrics_mock
48
Dan Shicf2e8dd2015-05-07 17:18:48 -070049
showard549afad2009-08-20 23:33:36 +000050BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
51PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000052
mbligh36768f02008-02-22 18:28:33 +000053RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000054AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
55
56if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000057 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000058AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
59AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
60
61if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000062 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000063
showard35162b02009-03-03 02:17:30 +000064# error message to leave in results dir when an autoserv process disappears
65# mysteriously
66_LOST_PROCESS_ERROR = """\
67Autoserv failed abnormally during execution for this job, probably due to a
68system error on the Autotest server. Full results may not be available. Sorry.
69"""
70
Prashanth B0e960282014-05-13 19:38:28 -070071_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070072_db = None
mbligh36768f02008-02-22 18:28:33 +000073_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070074
75# These 2 globals are replaced for testing
76_autoserv_directory = autoserv_utils.autoserv_directory
77_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000078_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000079_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070080
mbligh36768f02008-02-22 18:28:33 +000081
mbligh83c1e9e2009-05-01 23:10:41 +000082def _site_init_monitor_db_dummy():
83 return {}
84
85
jamesren76fcf192010-04-21 20:39:50 +000086def _verify_default_drone_set_exists():
87 if (models.DroneSet.drone_sets_enabled() and
88 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070089 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080090 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000091
92
93def _sanity_check():
94 """Make sure the configs are consistent before starting the scheduler"""
95 _verify_default_drone_set_exists()
96
97
mbligh36768f02008-02-22 18:28:33 +000098def main():
showard27f33872009-04-07 18:20:53 +000099 try:
showard549afad2009-08-20 23:33:36 +0000100 try:
101 main_without_exception_handling()
102 except SystemExit:
103 raise
104 except:
105 logging.exception('Exception escaping in monitor_db')
106 raise
107 finally:
108 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000109
110
111def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700112 scheduler_lib.setup_logging(
113 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
114 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000115 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000116 parser = optparse.OptionParser(usage)
117 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
118 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000119 parser.add_option('--test', help='Indicate that scheduler is under ' +
120 'test and should use dummy autoserv and no parsing',
121 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700122 parser.add_option('--production',
123 help=('Indicate that scheduler is running in production '
124 'environment and it can use database that is not '
125 'hosted in localhost. If it is set to False, '
126 'scheduler will fail if database is not in '
127 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700128 action='store_true', default=False)
jadmanski0afbb632008-06-06 21:10:57 +0000129 (options, args) = parser.parse_args()
130 if len(args) != 1:
131 parser.print_usage()
132 return
mbligh36768f02008-02-22 18:28:33 +0000133
Dan Shif6c65bd2014-08-29 16:15:07 -0700134 scheduler_lib.check_production_settings(options)
135
showard5613c662009-06-08 23:30:33 +0000136 scheduler_enabled = global_config.global_config.get_config_value(
137 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
138
139 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800140 logging.error("Scheduler not enabled, set enable_scheduler to true in "
141 "the global_config's SCHEDULER section to enable it. "
142 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000143 sys.exit(1)
144
jadmanski0afbb632008-06-06 21:10:57 +0000145 global RESULTS_DIR
146 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000147
mbligh83c1e9e2009-05-01 23:10:41 +0000148 site_init = utils.import_site_function(__file__,
149 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
150 _site_init_monitor_db_dummy)
151 site_init()
152
showardcca334f2009-03-12 20:38:34 +0000153 # Change the cwd while running to avoid issues incase we were launched from
154 # somewhere odd (such as a random NFS home directory of the person running
155 # sudo to launch us as the appropriate user).
156 os.chdir(RESULTS_DIR)
157
jamesrenc7d387e2010-08-10 21:48:30 +0000158 # This is helpful for debugging why stuff a scheduler launches is
159 # misbehaving.
160 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000161
jadmanski0afbb632008-06-06 21:10:57 +0000162 if options.test:
163 global _autoserv_path
164 _autoserv_path = 'autoserv_dummy'
165 global _testing_mode
166 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000167
jamesrenc44ae992010-02-19 00:12:54 +0000168 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000169 server.start()
170
Dan Shicf2e8dd2015-05-07 17:18:48 -0700171 # Start the thread to report metadata.
172 metadata_reporter.start()
173
Paul Hobbseedcb8b2016-10-05 16:44:27 -0700174 with ts_mon_config.SetupTsMonGlobalState('autotest_scheduler',
175 indirect=True):
176 try:
177 initialize()
178 dispatcher = Dispatcher()
179 dispatcher.initialize(recover_hosts=options.recover_hosts)
180 minimum_tick_sec = global_config.global_config.get_config_value(
181 scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
Richard Barnetteffed1722016-05-18 15:57:22 -0700182
Paul Hobbseedcb8b2016-10-05 16:44:27 -0700183 while not _shutdown and not server._shutdown_scheduler:
184 start = time.time()
185 dispatcher.tick()
186 curr_tick_sec = time.time() - start
187 if minimum_tick_sec > curr_tick_sec:
188 time.sleep(minimum_tick_sec - curr_tick_sec)
189 else:
190 time.sleep(0.0001)
191 except server_manager_utils.ServerActionError as e:
192 # This error is expected when the server is not in primary status
193 # for scheduler role. Thus do not send email for it.
194 logging.exception(e)
195 except Exception:
Aviv Keshet7f73f662017-04-27 11:48:45 -0700196 metrics.Counter('chromeos/autotest/scheduler/uncaught_exception'
197 ).increment()
jadmanski0afbb632008-06-06 21:10:57 +0000198
Paul Hobbsabd3b052016-10-03 18:25:23 +0000199 metadata_reporter.abort()
200 email_manager.manager.send_queued_emails()
201 server.shutdown()
202 _drone_manager.shutdown()
203 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000204
205
Prashanth B4ec98672014-05-15 10:44:54 -0700206def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000207 global _shutdown
208 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000209 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000210
211
jamesrenc44ae992010-02-19 00:12:54 +0000212def initialize():
showardb18134f2009-03-20 20:52:18 +0000213 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
214 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000215
showard8de37132009-08-31 18:33:08 +0000216 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000217 logging.critical("monitor_db already running, aborting!")
218 sys.exit(1)
219 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000220
showardb1e51872008-10-07 11:08:18 +0000221 if _testing_mode:
222 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700223 scheduler_lib.DB_CONFIG_SECTION, 'database',
224 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000225
Dan Shib9144a42014-12-01 16:09:32 -0800226 # If server database is enabled, check if the server has role `scheduler`.
227 # If the server does not have scheduler role, exception will be raised and
228 # scheduler will not continue to run.
229 if server_manager_utils.use_server_db():
230 server_manager_utils.confirm_server_has_role(hostname='localhost',
231 role='scheduler')
232
jadmanski0afbb632008-06-06 21:10:57 +0000233 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700234 global _db_manager
235 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700236 global _db
237 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000238 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700239 signal.signal(signal.SIGINT, handle_signal)
240 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000241
jamesrenc44ae992010-02-19 00:12:54 +0000242 initialize_globals()
243 scheduler_models.initialize()
244
Dan Shi114e1722016-01-10 18:12:53 -0800245 drone_list = system_utils.get_drones()
showard170873e2009-01-07 00:22:26 +0000246 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000247 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000248 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
249
showardb18134f2009-03-20 20:52:18 +0000250 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000251
252
jamesrenc44ae992010-02-19 00:12:54 +0000253def initialize_globals():
254 global _drone_manager
255 _drone_manager = drone_manager.instance()
256
257
showarded2afea2009-07-07 20:54:07 +0000258def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
259 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000260 """
261 @returns The autoserv command line as a list of executable + parameters.
262
263 @param machines - string - A machine or comma separated list of machines
264 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000265 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700266 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
267 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000268 @param queue_entry - A HostQueueEntry object - If supplied and no Job
269 object was supplied, this will be used to lookup the Job object.
270 """
Simran Basi1bf60eb2015-12-01 16:39:29 -0800271 command = autoserv_utils.autoserv_run_job_command(_autoserv_directory,
Aviv Keshet308e7362013-05-21 14:43:16 -0700272 machines, results_directory=drone_manager.WORKING_DIRECTORY,
273 extra_args=extra_args, job=job, queue_entry=queue_entry,
Simran Basi1bf60eb2015-12-01 16:39:29 -0800274 verbose=verbose, in_lab=True)
275 return command
showard87ba02a2009-04-20 19:37:32 +0000276
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800277def _calls_log_tick_msg(func):
278 """Used to trace functions called by BaseDispatcher.tick."""
279 @functools.wraps(func)
280 def wrapper(self, *args, **kwargs):
281 self._log_tick_msg('Starting %s' % func.__name__)
282 return func(self, *args, **kwargs)
283
284 return wrapper
285
showard87ba02a2009-04-20 19:37:32 +0000286
Simran Basia858a232012-08-21 11:04:37 -0700287class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800288
289
jadmanski0afbb632008-06-06 21:10:57 +0000290 def __init__(self):
291 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000292 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700293 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000294 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700295 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700296 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Dan Shic458f662015-04-29 12:12:38 -0700297 _db, _drone_manager)
showard170873e2009-01-07 00:22:26 +0000298 self._host_agents = {}
299 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000300 self._tick_count = 0
301 self._last_garbage_stats_time = time.time()
302 self._seconds_between_garbage_stats = 60 * (
303 global_config.global_config.get_config_value(
304 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700305 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700306 self._tick_debug = global_config.global_config.get_config_value(
307 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
308 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700309 self._extra_debugging = global_config.global_config.get_config_value(
310 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
311 default=False)
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800312 self._inline_host_acquisition = (
313 global_config.global_config.get_config_value(
314 scheduler_config.CONFIG_SECTION,
315 'inline_host_acquisition', type=bool, default=True))
mbligh36768f02008-02-22 18:28:33 +0000316
Prashanth Bf66d51b2014-05-06 12:42:25 -0700317 # If _inline_host_acquisition is set the scheduler will acquire and
318 # release hosts against jobs inline, with the tick. Otherwise the
319 # scheduler will only focus on jobs that already have hosts, and
320 # will not explicitly unlease a host when a job finishes using it.
321 self._job_query_manager = query_managers.AFEJobQueryManager()
322 self._host_scheduler = (host_scheduler.BaseHostScheduler()
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800323 if self._inline_host_acquisition else
Prashanth Bf66d51b2014-05-06 12:42:25 -0700324 host_scheduler.DummyHostScheduler())
325
mbligh36768f02008-02-22 18:28:33 +0000326
showard915958d2009-04-22 21:00:58 +0000327 def initialize(self, recover_hosts=True):
328 self._periodic_cleanup.initialize()
329 self._24hr_upkeep.initialize()
Dan Shi55d58992015-05-05 09:10:02 -0700330 # Execute all actions queued in the cleanup tasks. Scheduler tick will
331 # run a refresh task first. If there is any action in the queue, refresh
332 # will raise an exception.
333 _drone_manager.execute_actions()
showard915958d2009-04-22 21:00:58 +0000334
jadmanski0afbb632008-06-06 21:10:57 +0000335 # always recover processes
336 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000337
jadmanski0afbb632008-06-06 21:10:57 +0000338 if recover_hosts:
339 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000340
341
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800342 # TODO(pprabhu) Drop this metric once tick_times has been verified.
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -0800343 @metrics.SecondsTimerDecorator(
344 'chromeos/autotest/scheduler/tick_durations/tick')
jadmanski0afbb632008-06-06 21:10:57 +0000345 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700346 """
347 This is an altered version of tick() where we keep track of when each
348 major step begins so we can try to figure out where we are using most
349 of the tick time.
350 """
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800351 with metrics.RuntimeBreakdownTimer(
352 'chromeos/autotest/scheduler/tick_times') as breakdown_timer:
353 self._log_tick_msg('New tick')
354 system_utils.DroneCache.refresh()
355
356 with breakdown_timer.Step('garbage_collection'):
357 self._garbage_collection()
358 with breakdown_timer.Step('trigger_refresh'):
359 self._log_tick_msg('Starting _drone_manager.trigger_refresh')
360 _drone_manager.trigger_refresh()
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800361 with breakdown_timer.Step('schedule_running_host_queue_entries'):
362 self._schedule_running_host_queue_entries()
363 with breakdown_timer.Step('schedule_special_tasks'):
364 self._schedule_special_tasks()
365 with breakdown_timer.Step('schedule_new_jobs'):
366 self._schedule_new_jobs()
367 with breakdown_timer.Step('sync_refresh'):
368 self._log_tick_msg('Starting _drone_manager.sync_refresh')
369 _drone_manager.sync_refresh()
370 # _run_cleanup must be called between drone_manager.sync_refresh,
371 # and drone_manager.execute_actions, as sync_refresh will clear the
372 # calls queued in drones. Therefore, any action that calls
373 # drone.queue_call to add calls to the drone._calls, should be after
374 # drone refresh is completed and before
375 # drone_manager.execute_actions at the end of the tick.
376 with breakdown_timer.Step('run_cleanup'):
377 self._run_cleanup()
378 with breakdown_timer.Step('find_aborting'):
379 self._find_aborting()
380 with breakdown_timer.Step('find_aborted_special_tasks'):
381 self._find_aborted_special_tasks()
382 with breakdown_timer.Step('handle_agents'):
383 self._handle_agents()
384 with breakdown_timer.Step('host_scheduler_tick'):
385 self._log_tick_msg('Starting _host_scheduler.tick')
386 self._host_scheduler.tick()
387 with breakdown_timer.Step('drones_execute_actions'):
388 self._log_tick_msg('Starting _drone_manager.execute_actions')
389 _drone_manager.execute_actions()
390 with breakdown_timer.Step('send_queued_emails'):
391 self._log_tick_msg(
392 'Starting email_manager.manager.send_queued_emails')
393 email_manager.manager.send_queued_emails()
394 with breakdown_timer.Step('db_reset_queries'):
395 self._log_tick_msg('Starting django.db.reset_queries')
396 django.db.reset_queries()
397
398 self._tick_count += 1
399 metrics.Counter('chromeos/autotest/scheduler/tick').increment()
mbligh36768f02008-02-22 18:28:33 +0000400
showard97aed502008-11-04 02:01:24 +0000401
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800402 @_calls_log_tick_msg
mblighf3294cc2009-04-08 21:17:38 +0000403 def _run_cleanup(self):
404 self._periodic_cleanup.run_cleanup_maybe()
405 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000406
mbligh36768f02008-02-22 18:28:33 +0000407
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800408 @_calls_log_tick_msg
showardf13a9e22009-12-18 22:54:09 +0000409 def _garbage_collection(self):
410 threshold_time = time.time() - self._seconds_between_garbage_stats
411 if threshold_time < self._last_garbage_stats_time:
412 # Don't generate these reports very often.
413 return
414
415 self._last_garbage_stats_time = time.time()
416 # Force a full level 0 collection (because we can, it doesn't hurt
417 # at this interval).
418 gc.collect()
419 logging.info('Logging garbage collector stats on tick %d.',
420 self._tick_count)
421 gc_stats._log_garbage_collector_stats()
422
423
showard170873e2009-01-07 00:22:26 +0000424 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
425 for object_id in object_ids:
426 agent_dict.setdefault(object_id, set()).add(agent)
427
428
429 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
430 for object_id in object_ids:
431 assert object_id in agent_dict
432 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700433 # If an ID has no more active agent associated, there is no need to
434 # keep it in the dictionary. Otherwise, scheduler will keep an
435 # unnecessarily big dictionary until being restarted.
436 if not agent_dict[object_id]:
437 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000438
439
showardd1195652009-12-08 22:21:02 +0000440 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700441 """
442 Creates and adds an agent to the dispatchers list.
443
444 In creating the agent we also pass on all the queue_entry_ids and
445 host_ids from the special agent task. For every agent we create, we
446 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
447 against the host_ids given to it. So theoritically, a host can have any
448 number of agents associated with it, and each of them can have any
449 special agent task, though in practice we never see > 1 agent/task per
450 host at any time.
451
452 @param agent_task: A SpecialTask for the agent to manage.
453 """
showardd1195652009-12-08 22:21:02 +0000454 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000455 self._agents.append(agent)
456 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000457 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
458 self._register_agent_for_ids(self._queue_entry_agents,
459 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000460
showard170873e2009-01-07 00:22:26 +0000461
462 def get_agents_for_entry(self, queue_entry):
463 """
464 Find agents corresponding to the specified queue_entry.
465 """
showardd3dc1992009-04-22 21:01:40 +0000466 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000467
468
469 def host_has_agent(self, host):
470 """
471 Determine if there is currently an Agent present using this host.
472 """
473 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000474
475
jadmanski0afbb632008-06-06 21:10:57 +0000476 def remove_agent(self, agent):
477 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000478 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
479 agent)
480 self._unregister_agent_for_ids(self._queue_entry_agents,
481 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000482
483
showard8cc058f2009-09-08 16:26:33 +0000484 def _host_has_scheduled_special_task(self, host):
485 return bool(models.SpecialTask.objects.filter(host__id=host.id,
486 is_active=False,
487 is_complete=False))
488
489
jadmanski0afbb632008-06-06 21:10:57 +0000490 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000491 agent_tasks = self._create_recovery_agent_tasks()
492 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000493 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000494 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000495 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000496 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000497 self._reverify_remaining_hosts()
498 # reinitialize drones after killing orphaned processes, since they can
499 # leave around files when they die
500 _drone_manager.execute_actions()
501 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000502
showard170873e2009-01-07 00:22:26 +0000503
showardd1195652009-12-08 22:21:02 +0000504 def _create_recovery_agent_tasks(self):
505 return (self._get_queue_entry_agent_tasks()
506 + self._get_special_task_agent_tasks(is_active=True))
507
508
509 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700510 """
511 Get agent tasks for all hqe in the specified states.
512
513 Loosely this translates to taking a hqe in one of the specified states,
514 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
515 through _get_agent_task_for_queue_entry. Each queue entry can only have
516 one agent task at a time, but there might be multiple queue entries in
517 the group.
518
519 @return: A list of AgentTasks.
520 """
showardd1195652009-12-08 22:21:02 +0000521 # host queue entry statuses handled directly by AgentTasks (Verifying is
522 # handled through SpecialTasks, so is not listed here)
523 statuses = (models.HostQueueEntry.Status.STARTING,
524 models.HostQueueEntry.Status.RUNNING,
525 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000526 models.HostQueueEntry.Status.PARSING,
527 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000528 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000529 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000530 where='status IN (%s)' % status_list)
531
532 agent_tasks = []
533 used_queue_entries = set()
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800534 hqe_count_by_status = {}
showardd1195652009-12-08 22:21:02 +0000535 for entry in queue_entries:
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800536 hqe_count_by_status[entry.status] = (
537 hqe_count_by_status.get(entry.status, 0) + 1)
showardd1195652009-12-08 22:21:02 +0000538 if self.get_agents_for_entry(entry):
539 # already being handled
540 continue
541 if entry in used_queue_entries:
542 # already picked up by a synchronous job
543 continue
544 agent_task = self._get_agent_task_for_queue_entry(entry)
545 agent_tasks.append(agent_task)
546 used_queue_entries.update(agent_task.queue_entries)
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800547
548 for status, count in hqe_count_by_status.iteritems():
549 metrics.Gauge(
550 'chromeos/autotest/scheduler/active_host_queue_entries'
551 ).set(count, fields={'status': status})
552
showardd1195652009-12-08 22:21:02 +0000553 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000554
555
showardd1195652009-12-08 22:21:02 +0000556 def _get_special_task_agent_tasks(self, is_active=False):
557 special_tasks = models.SpecialTask.objects.filter(
558 is_active=is_active, is_complete=False)
559 return [self._get_agent_task_for_special_task(task)
560 for task in special_tasks]
561
562
563 def _get_agent_task_for_queue_entry(self, queue_entry):
564 """
beeps8bb1f7d2013-08-05 01:30:09 -0700565 Construct an AgentTask instance for the given active HostQueueEntry.
566
showardd1195652009-12-08 22:21:02 +0000567 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700568 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000569 """
570 task_entries = queue_entry.job.get_group_entries(queue_entry)
571 self._check_for_duplicate_host_entries(task_entries)
572
573 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
574 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000575 if queue_entry.is_hostless():
576 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000577 return QueueTask(queue_entries=task_entries)
578 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700579 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000580 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700581 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000582 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700583 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000584
Prashanth B0e960282014-05-13 19:38:28 -0700585 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800586 '_get_agent_task_for_queue_entry got entry with '
587 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000588
589
590 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000591 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
592 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000593 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000594 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000595 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000596 if using_host:
showardd1195652009-12-08 22:21:02 +0000597 self._assert_host_has_no_agent(task_entry)
598
599
600 def _assert_host_has_no_agent(self, entry):
601 """
602 @param entry: a HostQueueEntry or a SpecialTask
603 """
604 if self.host_has_agent(entry.host):
605 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700606 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000607 'While scheduling %s, host %s already has a host agent %s'
608 % (entry, entry.host, agent.task))
609
610
611 def _get_agent_task_for_special_task(self, special_task):
612 """
613 Construct an AgentTask class to run the given SpecialTask and add it
614 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700615
MK Ryu35d661e2014-09-25 17:44:10 -0700616 A special task is created through schedule_special_tasks, but only if
beeps8bb1f7d2013-08-05 01:30:09 -0700617 the host doesn't already have an agent. This happens through
618 add_agent_task. All special agent tasks are given a host on creation,
619 and a Null hqe. To create a SpecialAgentTask object, you need a
620 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
621 object contains a hqe it's passed on to the special agent task, which
622 creates a HostQueueEntry and saves it as it's queue_entry.
623
showardd1195652009-12-08 22:21:02 +0000624 @param special_task: a models.SpecialTask instance
625 @returns an AgentTask to run this SpecialTask
626 """
627 self._assert_host_has_no_agent(special_task)
628
beeps5e2bb4a2013-10-28 11:26:45 -0700629 special_agent_task_classes = (prejob_task.CleanupTask,
630 prejob_task.VerifyTask,
631 prejob_task.RepairTask,
632 prejob_task.ResetTask,
633 prejob_task.ProvisionTask)
634
showardd1195652009-12-08 22:21:02 +0000635 for agent_task_class in special_agent_task_classes:
636 if agent_task_class.TASK_TYPE == special_task.task:
637 return agent_task_class(task=special_task)
638
Prashanth B0e960282014-05-13 19:38:28 -0700639 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800640 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000641
642
643 def _register_pidfiles(self, agent_tasks):
644 for agent_task in agent_tasks:
645 agent_task.register_necessary_pidfiles()
646
647
648 def _recover_tasks(self, agent_tasks):
649 orphans = _drone_manager.get_orphaned_autoserv_processes()
650
651 for agent_task in agent_tasks:
652 agent_task.recover()
653 if agent_task.monitor and agent_task.monitor.has_process():
654 orphans.discard(agent_task.monitor.get_process())
655 self.add_agent_task(agent_task)
656
657 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000658
659
showard8cc058f2009-09-08 16:26:33 +0000660 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000661 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
662 % status):
showard0db3d432009-10-12 20:29:15 +0000663 if entry.status == status and not self.get_agents_for_entry(entry):
664 # The status can change during iteration, e.g., if job.run()
665 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000666 yield entry
667
668
showard6878e8b2009-07-20 22:37:45 +0000669 def _check_for_remaining_orphan_processes(self, orphans):
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800670 m = 'chromeos/autotest/errors/unrecovered_orphan_processes'
671 metrics.Gauge(m).set(len(orphans))
672
showard6878e8b2009-07-20 22:37:45 +0000673 if not orphans:
674 return
675 subject = 'Unrecovered orphan autoserv processes remain'
676 message = '\n'.join(str(process) for process in orphans)
mbligh5fa9e112009-08-03 16:46:06 +0000677 die_on_orphans = global_config.global_config.get_config_value(
678 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
679
680 if die_on_orphans:
681 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000682
showard170873e2009-01-07 00:22:26 +0000683
showard8cc058f2009-09-08 16:26:33 +0000684 def _recover_pending_entries(self):
685 for entry in self._get_unassigned_entries(
686 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000687 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000688 entry.on_pending()
689
690
showardb8900452009-10-12 20:31:01 +0000691 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000692 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000693 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
694 unrecovered_hqes = []
695 for queue_entry in queue_entries:
696 special_tasks = models.SpecialTask.objects.filter(
697 task__in=(models.SpecialTask.Task.CLEANUP,
698 models.SpecialTask.Task.VERIFY),
699 queue_entry__id=queue_entry.id,
700 is_complete=False)
701 if special_tasks.count() == 0:
702 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000703
showardb8900452009-10-12 20:31:01 +0000704 if unrecovered_hqes:
705 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700706 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000707 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000708 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000709
710
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800711 @_calls_log_tick_msg
showard65db3932009-10-28 19:54:35 +0000712 def _schedule_special_tasks(self):
713 """
714 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700715
716 Special tasks include PreJobTasks like verify, reset and cleanup.
717 They are created through _schedule_new_jobs and associated with a hqe
718 This method translates SpecialTasks to the appropriate AgentTask and
719 adds them to the dispatchers agents list, so _handle_agents can execute
720 them.
showard65db3932009-10-28 19:54:35 +0000721 """
Prashanth B4ec98672014-05-15 10:44:54 -0700722 # When the host scheduler is responsible for acquisition we only want
723 # to run tasks with leased hosts. All hqe tasks will already have
724 # leased hosts, and we don't want to run frontend tasks till the host
725 # scheduler has vetted the assignment. Note that this doesn't include
726 # frontend tasks with hosts leased by other active hqes.
727 for task in self._job_query_manager.get_prioritized_special_tasks(
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800728 only_tasks_with_leased_hosts=not self._inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000729 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000730 continue
showardd1195652009-12-08 22:21:02 +0000731 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000732
733
showard170873e2009-01-07 00:22:26 +0000734 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000735 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000736 # should never happen
showarded2afea2009-07-07 20:54:07 +0000737 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000738 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000739 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700740 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000741 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000742
743
jadmanski0afbb632008-06-06 21:10:57 +0000744 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000745 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700746 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000747 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000748 if self.host_has_agent(host):
749 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000750 continue
showard8cc058f2009-09-08 16:26:33 +0000751 if self._host_has_scheduled_special_task(host):
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700752 # host will have a special task scheduled on the next tick
showard8cc058f2009-09-08 16:26:33 +0000753 continue
showard170873e2009-01-07 00:22:26 +0000754 if print_message:
showardb18134f2009-03-20 20:52:18 +0000755 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000756 models.SpecialTask.objects.create(
757 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000758 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000759
760
jadmanski0afbb632008-06-06 21:10:57 +0000761 def _recover_hosts(self):
762 # recover "Repair Failed" hosts
763 message = 'Reverifying dead host %s'
764 self._reverify_hosts_where("status = 'Repair Failed'",
765 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000766
767
showard89f84db2009-03-12 20:39:13 +0000768 def _refresh_pending_queue_entries(self):
769 """
770 Lookup the pending HostQueueEntries and call our HostScheduler
771 refresh() method given that list. Return the list.
772
773 @returns A list of pending HostQueueEntries sorted in priority order.
774 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700775 queue_entries = self._job_query_manager.get_pending_queue_entries(
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800776 only_hostless=not self._inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000777 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000778 return []
showard89f84db2009-03-12 20:39:13 +0000779 return queue_entries
780
781
showarda9545c02009-12-18 22:44:26 +0000782 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800783 """Schedule a hostless (suite) job.
784
785 @param queue_entry: The queue_entry representing the hostless job.
786 """
showarda9545c02009-12-18 22:44:26 +0000787 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700788
789 # Need to set execution_subdir before setting the status:
790 # After a restart of the scheduler, agents will be restored for HQEs in
791 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
792 # execution_subdir is needed. Therefore it must be set before entering
793 # one of these states.
794 # Otherwise, if the scheduler was interrupted between setting the status
795 # and the execution_subdir, upon it's restart restoring agents would
796 # fail.
797 # Is there a way to get a status in one of these states without going
798 # through this code? Following cases are possible:
799 # - If it's aborted before being started:
800 # active bit will be 0, so there's nothing to parse, it will just be
801 # set to completed by _find_aborting. Critical statuses are skipped.
802 # - If it's aborted or it fails after being started:
803 # It was started, so this code was executed.
804 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000805 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000806
807
beepscc9fc702013-12-02 12:45:38 -0800808 def _schedule_host_job(self, host, queue_entry):
809 """Schedules a job on the given host.
810
811 1. Assign the host to the hqe, if it isn't already assigned.
812 2. Create a SpecialAgentTask for the hqe.
813 3. Activate the hqe.
814
815 @param queue_entry: The job to schedule.
816 @param host: The host to schedule the job on.
817 """
818 if self.host_has_agent(host):
819 host_agent_task = list(self._host_agents.get(host.id))[0].task
beepscc9fc702013-12-02 12:45:38 -0800820 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700821 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800822
823
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800824 @_calls_log_tick_msg
showard89f84db2009-03-12 20:39:13 +0000825 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700826 """
827 Find any new HQEs and call schedule_pre_job_tasks for it.
828
829 This involves setting the status of the HQE and creating a row in the
830 db corresponding the the special task, through
831 scheduler_models._queue_special_task. The new db row is then added as
832 an agent to the dispatcher through _schedule_special_tasks and
833 scheduled for execution on the drone through _handle_agents.
834 """
showard89f84db2009-03-12 20:39:13 +0000835 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000836
beepscc9fc702013-12-02 12:45:38 -0800837 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700838 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700839 new_jobs_with_hosts = 0
840 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800841 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700842 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000843
beepscc9fc702013-12-02 12:45:38 -0800844 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000845 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000846 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700847 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000848 else:
beepscc9fc702013-12-02 12:45:38 -0800849 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700850 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700851
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800852 metrics.Counter(
853 'chromeos/autotest/scheduler/scheduled_jobs_hostless'
854 ).increment_by(new_hostless_jobs)
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800855
beepscc9fc702013-12-02 12:45:38 -0800856 if not host_jobs:
857 return
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800858
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800859 if not self._inline_host_acquisition:
Prathmesh Prabhu3c6a3bf2016-12-20 11:29:02 -0800860 # In this case, host_scheduler is responsible for scheduling
861 # host_jobs. Scheduling the jobs ourselves can lead to DB corruption
862 # since host_scheduler assumes it is the single process scheduling
863 # host jobs.
864 metrics.Gauge(
865 'chromeos/autotest/errors/scheduler/unexpected_host_jobs').set(
866 len(host_jobs))
867 return
868
Prashanth Bf66d51b2014-05-06 12:42:25 -0700869 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
870 for host_assignment in jobs_with_hosts:
871 self._schedule_host_job(host_assignment.host, host_assignment.job)
872 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800873
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800874 metrics.Counter(
875 'chromeos/autotest/scheduler/scheduled_jobs_with_hosts'
876 ).increment_by(new_jobs_with_hosts)
877 # TODO(pprabhu): Decide what to do about this metric. Million dollar
878 # question: What happens to jobs that were not matched. Do they stay in
879 # the queue, and get processed right here in the next tick (then we want
880 # a guage corresponding to the number of outstanding unmatched host
881 # jobs), or are they handled somewhere else (then we need a counter
882 # corresponding to failed_to_match_with_hosts jobs).
883 #autotest_stats.Gauge(key).send('new_jobs_without_hosts',
884 # new_jobs_need_hosts -
885 # new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000886
887
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800888 @_calls_log_tick_msg
showard8cc058f2009-09-08 16:26:33 +0000889 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700890 """
891 Adds agents to the dispatcher.
892
893 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
894 QueueTask for example, will have a job with a control file, and
895 the agent will have methods that poll, abort and check if the queue
896 task is finished. The dispatcher runs the agent_task, as well as
897 other agents in it's _agents member, through _handle_agents, by
898 calling the Agents tick().
899
900 This method creates an agent for each HQE in one of (starting, running,
901 gathering, parsing, archiving) states, and adds it to the dispatcher so
902 it is handled by _handle_agents.
903 """
showardd1195652009-12-08 22:21:02 +0000904 for agent_task in self._get_queue_entry_agent_tasks():
905 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000906
907
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800908 @_calls_log_tick_msg
jadmanski0afbb632008-06-06 21:10:57 +0000909 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700910 """
911 Looks through the afe_host_queue_entries for an aborted entry.
912
913 The aborted bit is set on an HQE in many ways, the most common
914 being when a user requests an abort through the frontend, which
915 results in an rpc from the afe to abort_host_queue_entries.
916 """
jamesrene7c65cb2010-06-08 20:38:10 +0000917 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000918 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700919 where='aborted=1 and complete=0'):
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800920
921 # If the job is running on a shard, let the shard handle aborting
922 # it and sync back the right status.
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800923 if entry.job.shard_id is not None and not server_utils.is_shard():
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800924 logging.info('Waiting for shard %s to abort hqe %s',
925 entry.job.shard_id, entry)
926 continue
927
showardf4a2e502009-07-28 20:06:39 +0000928 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800929
930 # The task would have started off with both is_complete and
931 # is_active = False. Aborted tasks are neither active nor complete.
932 # For all currently active tasks this will happen through the agent,
933 # but we need to manually update the special tasks that haven't
934 # started yet, because they don't have agents.
935 models.SpecialTask.objects.filter(is_active=False,
936 queue_entry_id=entry.id).update(is_complete=True)
937
showardd3dc1992009-04-22 21:01:40 +0000938 for agent in self.get_agents_for_entry(entry):
939 agent.abort()
940 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000941 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700942 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000943 for job in jobs_to_stop:
944 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000945
946
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800947 @_calls_log_tick_msg
beeps8bb1f7d2013-08-05 01:30:09 -0700948 def _find_aborted_special_tasks(self):
949 """
950 Find SpecialTasks that have been marked for abortion.
951
952 Poll the database looking for SpecialTasks that are active
953 and have been marked for abortion, then abort them.
954 """
955
956 # The completed and active bits are very important when it comes
957 # to scheduler correctness. The active bit is set through the prolog
958 # of a special task, and reset through the cleanup method of the
959 # SpecialAgentTask. The cleanup is called both through the abort and
960 # epilog. The complete bit is set in several places, and in general
961 # a hanging job will have is_active=1 is_complete=0, while a special
962 # task which completed will have is_active=0 is_complete=1. To check
963 # aborts we directly check active because the complete bit is set in
964 # several places, including the epilog of agent tasks.
965 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
966 is_aborted=True)
967 for task in aborted_tasks:
968 # There are 2 ways to get the agent associated with a task,
969 # through the host and through the hqe. A special task
970 # always needs a host, but doesn't always need a hqe.
971 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700972 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000973
beeps8bb1f7d2013-08-05 01:30:09 -0700974 # The epilog preforms critical actions such as
975 # queueing the next SpecialTask, requeuing the
976 # hqe etc, however it doesn't actually kill the
977 # monitor process and set the 'done' bit. Epilogs
978 # assume that the job failed, and that the monitor
979 # process has already written an exit code. The
980 # done bit is a necessary condition for
981 # _handle_agents to schedule any more special
982 # tasks against the host, and it must be set
983 # in addition to is_active, is_complete and success.
984 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000985 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700986
987
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700988 def _can_start_agent(self, agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000989 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000990 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000991 return True
992 # don't allow any nonzero-process agents to run after we've reached a
993 # limit (this avoids starvation of many-process agents)
994 if have_reached_limit:
995 return False
996 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000997 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000998 agent.task.owner_username,
999 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +00001000 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001001 return False
showard4c5374f2008-09-04 17:02:56 +00001002 return True
1003
1004
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -08001005 @_calls_log_tick_msg
jadmanski0afbb632008-06-06 21:10:57 +00001006 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -07001007 """
1008 Handles agents of the dispatcher.
1009
1010 Appropriate Agents are added to the dispatcher through
1011 _schedule_running_host_queue_entries. These agents each
1012 have a task. This method runs the agents task through
1013 agent.tick() leading to:
1014 agent.start
1015 prolog -> AgentTasks prolog
1016 For each queue entry:
1017 sets host status/status to Running
1018 set started_on in afe_host_queue_entries
1019 run -> AgentTasks run
1020 Creates PidfileRunMonitor
1021 Queues the autoserv command line for this AgentTask
1022 via the drone manager. These commands are executed
1023 through the drone managers execute actions.
1024 poll -> AgentTasks/BaseAgentTask poll
1025 checks the monitors exit_code.
1026 Executes epilog if task is finished.
1027 Executes AgentTasks _finish_task
1028 finish_task is usually responsible for setting the status
1029 of the HQE/host, and updating it's active and complete fileds.
1030
1031 agent.is_done
1032 Removed the agent from the dispatchers _agents queue.
1033 Is_done checks the finished bit on the agent, that is
1034 set based on the Agents task. During the agents poll
1035 we check to see if the monitor process has exited in
1036 it's finish method, and set the success member of the
1037 task based on this exit code.
1038 """
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001039 num_started_this_tick = 0
1040 num_finished_this_tick = 0
showard4c5374f2008-09-04 17:02:56 +00001041 have_reached_limit = False
1042 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -07001043 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +00001044 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001045 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1046 'queue_entry ids:%s' % (agent.host_ids,
1047 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001048 if not agent.started:
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001049 if not self._can_start_agent(agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001050 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001051 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001052 continue
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001053 num_started_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001054 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001055 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001056 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001057 if agent.is_done():
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001058 num_finished_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001059 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001060 self.remove_agent(agent)
Prathmesh Prabhued7ece92016-11-23 11:19:43 -08001061
1062 metrics.Counter(
1063 'chromeos/autotest/scheduler/agent_processes_started'
1064 ).increment_by(num_started_this_tick)
1065 metrics.Counter(
1066 'chromeos/autotest/scheduler/agent_processes_finished'
1067 ).increment_by(num_finished_this_tick)
1068 num_agent_processes = _drone_manager.total_running_processes()
1069 metrics.Gauge(
1070 'chromeos/autotest/scheduler/agent_processes'
1071 ).set(num_agent_processes)
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001072 logging.info('%d running processes. %d added this tick.',
Prathmesh Prabhued7ece92016-11-23 11:19:43 -08001073 num_agent_processes, num_started_this_tick)
mbligh36768f02008-02-22 18:28:33 +00001074
1075
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -08001076 def _log_tick_msg(self, msg):
1077 if self._tick_debug:
1078 logging.debug(msg)
1079
1080
1081 def _log_extra_msg(self, msg):
1082 if self._extra_debugging:
1083 logging.debug(msg)
1084
1085
Simran Basia858a232012-08-21 11:04:37 -07001086SiteDispatcher = utils.import_site_class(
1087 __file__, 'autotest_lib.scheduler.site_monitor_db',
1088 'SiteDispatcher', BaseDispatcher)
1089
1090class Dispatcher(SiteDispatcher):
1091 pass
1092
1093
mbligh36768f02008-02-22 18:28:33 +00001094class Agent(object):
showard77182562009-06-10 00:16:05 +00001095 """
Alex Miller47715eb2013-07-24 03:34:01 -07001096 An agent for use by the Dispatcher class to perform a task. An agent wraps
1097 around an AgentTask mainly to associate the AgentTask with the queue_entry
1098 and host ids.
showard77182562009-06-10 00:16:05 +00001099
1100 The following methods are required on all task objects:
1101 poll() - Called periodically to let the task check its status and
1102 update its internal state. If the task succeeded.
1103 is_done() - Returns True if the task is finished.
1104 abort() - Called when an abort has been requested. The task must
1105 set its aborted attribute to True if it actually aborted.
1106
1107 The following attributes are required on all task objects:
1108 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001109 success - bool, True if this task succeeded.
1110 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1111 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001112 """
1113
1114
showard418785b2009-11-23 20:19:59 +00001115 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001116 """
Alex Miller47715eb2013-07-24 03:34:01 -07001117 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001118 """
showard8cc058f2009-09-08 16:26:33 +00001119 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001120
showard77182562009-06-10 00:16:05 +00001121 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001122 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001123
showard8cc058f2009-09-08 16:26:33 +00001124 self.queue_entry_ids = task.queue_entry_ids
1125 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001126
showard8cc058f2009-09-08 16:26:33 +00001127 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001128 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001129
1130
jadmanski0afbb632008-06-06 21:10:57 +00001131 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001132 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001133 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001134 self.task.poll()
1135 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001136 self.finished = True
showardec113162008-05-08 00:52:49 +00001137
1138
jadmanski0afbb632008-06-06 21:10:57 +00001139 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001140 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001141
1142
showardd3dc1992009-04-22 21:01:40 +00001143 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001144 if self.task:
1145 self.task.abort()
1146 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001147 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001148 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001149
showardd3dc1992009-04-22 21:01:40 +00001150
beeps5e2bb4a2013-10-28 11:26:45 -07001151class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001152 """
1153 Common functionality for QueueTask and HostlessQueueTask
1154 """
1155 def __init__(self, queue_entries):
1156 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001157 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001158 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001159
1160
showard73ec0442009-02-07 02:05:20 +00001161 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001162 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001163
1164
jamesrenc44ae992010-02-19 00:12:54 +00001165 def _write_control_file(self, execution_path):
1166 control_path = _drone_manager.attach_file_to_execution(
1167 execution_path, self.job.control_file)
1168 return control_path
1169
1170
Aviv Keshet308e7362013-05-21 14:43:16 -07001171 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001172 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001173 execution_path = self.queue_entries[0].execution_path()
1174 control_path = self._write_control_file(execution_path)
1175 hostnames = ','.join(entry.host.hostname
1176 for entry in self.queue_entries
1177 if not entry.is_hostless())
1178
1179 execution_tag = self.queue_entries[0].execution_tag()
1180 params = _autoserv_command_line(
1181 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001182 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001183 _drone_manager.absolute_path(control_path)],
1184 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001185 if self.job.is_image_update_job():
1186 params += ['--image', self.job.update_image_path]
1187
jamesrenc44ae992010-02-19 00:12:54 +00001188 return params
showardd1195652009-12-08 22:21:02 +00001189
1190
1191 @property
1192 def num_processes(self):
1193 return len(self.queue_entries)
1194
1195
1196 @property
1197 def owner_username(self):
1198 return self.job.owner
1199
1200
1201 def _working_directory(self):
1202 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001203
1204
jadmanski0afbb632008-06-06 21:10:57 +00001205 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001206 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001207 keyval_dict = self.job.keyval_dict()
1208 keyval_dict[queued_key] = queued_time
showardf1ae3542009-05-11 19:26:02 +00001209 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001210 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001211 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001212 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001213
1214
showard35162b02009-03-03 02:17:30 +00001215 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001216 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001217 _drone_manager.write_lines_to_file(error_file_path,
1218 [_LOST_PROCESS_ERROR])
1219
1220
showardd3dc1992009-04-22 21:01:40 +00001221 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001222 if not self.monitor:
1223 return
1224
showardd9205182009-04-27 20:09:55 +00001225 self._write_job_finished()
1226
showard35162b02009-03-03 02:17:30 +00001227 if self.monitor.lost_process:
1228 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001229
jadmanskif7fa2cc2008-10-01 14:13:23 +00001230
showardcbd74612008-11-19 21:42:02 +00001231 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001232 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001233 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001234 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001235 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001236
1237
jadmanskif7fa2cc2008-10-01 14:13:23 +00001238 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001239 if not self.monitor or not self.monitor.has_process():
1240 return
1241
jadmanskif7fa2cc2008-10-01 14:13:23 +00001242 # build up sets of all the aborted_by and aborted_on values
1243 aborted_by, aborted_on = set(), set()
1244 for queue_entry in self.queue_entries:
1245 if queue_entry.aborted_by:
1246 aborted_by.add(queue_entry.aborted_by)
1247 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1248 aborted_on.add(t)
1249
1250 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001251 # TODO(showard): this conditional is now obsolete, we just need to leave
1252 # it in temporarily for backwards compatibility over upgrades. delete
1253 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001254 assert len(aborted_by) <= 1
1255 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001256 aborted_by_value = aborted_by.pop()
1257 aborted_on_value = max(aborted_on)
1258 else:
1259 aborted_by_value = 'autotest_system'
1260 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001261
showarda0382352009-02-11 23:36:43 +00001262 self._write_keyval_after_job("aborted_by", aborted_by_value)
1263 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001264
showardcbd74612008-11-19 21:42:02 +00001265 aborted_on_string = str(datetime.datetime.fromtimestamp(
1266 aborted_on_value))
1267 self._write_status_comment('Job aborted by %s on %s' %
1268 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001269
1270
jadmanski0afbb632008-06-06 21:10:57 +00001271 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001272 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001273 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001274 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001275
1276
jadmanski0afbb632008-06-06 21:10:57 +00001277 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001278 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001279 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001280
1281
1282class QueueTask(AbstractQueueTask):
1283 def __init__(self, queue_entries):
1284 super(QueueTask, self).__init__(queue_entries)
1285 self._set_ids(queue_entries=queue_entries)
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -08001286 self._enable_ssp_container = (
1287 global_config.global_config.get_config_value(
1288 'AUTOSERV', 'enable_ssp_container', type=bool,
1289 default=True))
showarda9545c02009-12-18 22:44:26 +00001290
1291
1292 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001293 self._check_queue_entry_statuses(
1294 self.queue_entries,
1295 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1296 models.HostQueueEntry.Status.RUNNING),
1297 allowed_host_statuses=(models.Host.Status.PENDING,
1298 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001299
1300 super(QueueTask, self).prolog()
1301
1302 for queue_entry in self.queue_entries:
1303 self._write_host_keyvals(queue_entry.host)
1304 queue_entry.host.set_status(models.Host.Status.RUNNING)
1305 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001306
1307
1308 def _finish_task(self):
1309 super(QueueTask, self)._finish_task()
1310
1311 for queue_entry in self.queue_entries:
1312 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001313 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001314
1315
Alex Miller9f01d5d2013-08-08 02:26:01 -07001316 def _command_line(self):
Dan Shi36cfd832014-10-10 13:38:51 -07001317 invocation = super(QueueTask, self)._command_line()
1318 # Check if server-side packaging is needed.
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -08001319 if (self._enable_ssp_container and
Dan Shi36cfd832014-10-10 13:38:51 -07001320 self.job.control_type == control_data.CONTROL_TYPE.SERVER and
1321 self.job.require_ssp != False):
Dan Shiec1d47d2015-02-13 11:38:13 -08001322 invocation += ['--require-ssp']
Dan Shi36cfd832014-10-10 13:38:51 -07001323 keyval_dict = self.job.keyval_dict()
1324 test_source_build = keyval_dict.get('test_source_build', None)
1325 if test_source_build:
1326 invocation += ['--test_source_build', test_source_build]
Dan Shi70647ca2015-07-16 22:52:35 -07001327 if self.job.parent_job_id:
1328 invocation += ['--parent_job_id', str(self.job.parent_job_id)]
Dan Shi36cfd832014-10-10 13:38:51 -07001329 return invocation + ['--verify_job_repo_url']
Alex Miller9f01d5d2013-08-08 02:26:01 -07001330
1331
Dan Shi1a189052013-10-28 14:41:35 -07001332class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001333 def __init__(self, queue_entry):
1334 super(HostlessQueueTask, self).__init__([queue_entry])
1335 self.queue_entry_ids = [queue_entry.id]
1336
1337
1338 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001339 super(HostlessQueueTask, self).prolog()
1340
1341
mbligh4608b002010-01-05 18:22:35 +00001342 def _finish_task(self):
1343 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001344
1345 # When a job is added to database, its initial status is always
1346 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1347 # status, check if any of them can be started. If scheduler hits some
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001348 # limit, e.g., max_hostless_jobs_per_drone, scheduler will
1349 # leave these jobs in Starting status. Otherwise, the jobs'
1350 # status will be changed to Running, and an autoserv process
1351 # will be started in drone for each of these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001352 # If the entry is still in status Starting, the process has not started
1353 # yet. Therefore, there is no need to parse and collect log. Without
1354 # this check, exception will be raised by scheduler as execution_subdir
1355 # for this queue entry does not have a value yet.
1356 hqe = self.queue_entries[0]
1357 if hqe.status != models.HostQueueEntry.Status.STARTING:
1358 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001359
1360
mbligh36768f02008-02-22 18:28:33 +00001361if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001362 main()