blob: 114e959a0dbf37a994c11a25e9ddb5cf375356b5 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
Richard Barnetteffed1722016-05-18 15:57:22 -07002
3#pylint: disable=C0111
4
mbligh36768f02008-02-22 18:28:33 +00005"""
6Autotest scheduler
7"""
showard909c7a62008-07-15 21:52:38 +00008
Dan Shif6c65bd2014-08-29 16:15:07 -07009import datetime
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -080010import functools
Dan Shif6c65bd2014-08-29 16:15:07 -070011import gc
12import logging
13import optparse
14import os
15import signal
16import sys
17import time
showard402934a2009-12-21 22:20:47 +000018
Alex Miller05d7b4c2013-03-04 07:49:38 -080019import common
showard21baa452008-10-21 00:08:39 +000020from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000021
22import django.db
23
Dan Shiec1d47d2015-02-13 11:38:13 -080024from autotest_lib.client.common_lib import control_data
Prashanth B0e960282014-05-13 19:38:28 -070025from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070026from autotest_lib.client.common_lib import utils
xixuan4de4e742017-01-30 13:31:35 -080027from autotest_lib.frontend.afe import models
Fang Dengc330bee2014-10-21 18:10:55 -070028from autotest_lib.scheduler import agent_task, drone_manager
beeps5e2bb4a2013-10-28 11:26:45 -070029from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
30from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070031from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070032from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070033from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000034from autotest_lib.scheduler import scheduler_models
Aviv Keshet6b4e3c22017-05-04 20:14:02 -070035from autotest_lib.scheduler import scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070036from autotest_lib.server import autoserv_utils
Dan Shi114e1722016-01-10 18:12:53 -080037from autotest_lib.server import system_utils
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080038from autotest_lib.server import utils as server_utils
Dan Shicf2e8dd2015-05-07 17:18:48 -070039from autotest_lib.site_utils import metadata_reporter
Dan Shib9144a42014-12-01 16:09:32 -080040from autotest_lib.site_utils import server_manager_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080041
Dan Shi5e2efb72017-02-07 11:40:23 -080042try:
43 from chromite.lib import metrics
44 from chromite.lib import ts_mon_config
45except ImportError:
46 metrics = utils.metrics_mock
47 ts_mon_config = utils.metrics_mock
48
Dan Shicf2e8dd2015-05-07 17:18:48 -070049
showard549afad2009-08-20 23:33:36 +000050BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
51PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000052
mbligh36768f02008-02-22 18:28:33 +000053RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000054AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
55
56if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000057 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000058AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
59AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
60
61if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000062 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000063
showard35162b02009-03-03 02:17:30 +000064# error message to leave in results dir when an autoserv process disappears
65# mysteriously
66_LOST_PROCESS_ERROR = """\
67Autoserv failed abnormally during execution for this job, probably due to a
68system error on the Autotest server. Full results may not be available. Sorry.
69"""
70
Prashanth B0e960282014-05-13 19:38:28 -070071_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070072_db = None
mbligh36768f02008-02-22 18:28:33 +000073_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070074
75# These 2 globals are replaced for testing
76_autoserv_directory = autoserv_utils.autoserv_directory
77_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000078_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000079_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070080
mbligh36768f02008-02-22 18:28:33 +000081
mbligh83c1e9e2009-05-01 23:10:41 +000082def _site_init_monitor_db_dummy():
83 return {}
84
85
jamesren76fcf192010-04-21 20:39:50 +000086def _verify_default_drone_set_exists():
87 if (models.DroneSet.drone_sets_enabled() and
88 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070089 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080090 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000091
92
93def _sanity_check():
94 """Make sure the configs are consistent before starting the scheduler"""
95 _verify_default_drone_set_exists()
96
97
mbligh36768f02008-02-22 18:28:33 +000098def main():
showard27f33872009-04-07 18:20:53 +000099 try:
showard549afad2009-08-20 23:33:36 +0000100 try:
101 main_without_exception_handling()
102 except SystemExit:
103 raise
104 except:
105 logging.exception('Exception escaping in monitor_db')
106 raise
107 finally:
108 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000109
110
111def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700112 scheduler_lib.setup_logging(
113 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
114 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000115 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000116 parser = optparse.OptionParser(usage)
117 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
118 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000119 parser.add_option('--test', help='Indicate that scheduler is under ' +
120 'test and should use dummy autoserv and no parsing',
121 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700122 parser.add_option('--production',
123 help=('Indicate that scheduler is running in production '
124 'environment and it can use database that is not '
125 'hosted in localhost. If it is set to False, '
126 'scheduler will fail if database is not in '
127 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700128 action='store_true', default=False)
jadmanski0afbb632008-06-06 21:10:57 +0000129 (options, args) = parser.parse_args()
130 if len(args) != 1:
131 parser.print_usage()
132 return
mbligh36768f02008-02-22 18:28:33 +0000133
Dan Shif6c65bd2014-08-29 16:15:07 -0700134 scheduler_lib.check_production_settings(options)
135
showard5613c662009-06-08 23:30:33 +0000136 scheduler_enabled = global_config.global_config.get_config_value(
137 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
138
139 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800140 logging.error("Scheduler not enabled, set enable_scheduler to true in "
141 "the global_config's SCHEDULER section to enable it. "
142 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000143 sys.exit(1)
144
jadmanski0afbb632008-06-06 21:10:57 +0000145 global RESULTS_DIR
146 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000147
mbligh83c1e9e2009-05-01 23:10:41 +0000148 site_init = utils.import_site_function(__file__,
149 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
150 _site_init_monitor_db_dummy)
151 site_init()
152
showardcca334f2009-03-12 20:38:34 +0000153 # Change the cwd while running to avoid issues incase we were launched from
154 # somewhere odd (such as a random NFS home directory of the person running
155 # sudo to launch us as the appropriate user).
156 os.chdir(RESULTS_DIR)
157
jamesrenc7d387e2010-08-10 21:48:30 +0000158 # This is helpful for debugging why stuff a scheduler launches is
159 # misbehaving.
160 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000161
jadmanski0afbb632008-06-06 21:10:57 +0000162 if options.test:
163 global _autoserv_path
164 _autoserv_path = 'autoserv_dummy'
165 global _testing_mode
166 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000167
Dan Shicf2e8dd2015-05-07 17:18:48 -0700168 # Start the thread to report metadata.
169 metadata_reporter.start()
170
Paul Hobbseedcb8b2016-10-05 16:44:27 -0700171 with ts_mon_config.SetupTsMonGlobalState('autotest_scheduler',
172 indirect=True):
173 try:
174 initialize()
175 dispatcher = Dispatcher()
176 dispatcher.initialize(recover_hosts=options.recover_hosts)
177 minimum_tick_sec = global_config.global_config.get_config_value(
178 scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
Richard Barnetteffed1722016-05-18 15:57:22 -0700179
Aviv Keshet6b4e3c22017-05-04 20:14:02 -0700180 while not _shutdown:
Paul Hobbseedcb8b2016-10-05 16:44:27 -0700181 start = time.time()
182 dispatcher.tick()
183 curr_tick_sec = time.time() - start
184 if minimum_tick_sec > curr_tick_sec:
185 time.sleep(minimum_tick_sec - curr_tick_sec)
186 else:
187 time.sleep(0.0001)
188 except server_manager_utils.ServerActionError as e:
189 # This error is expected when the server is not in primary status
190 # for scheduler role. Thus do not send email for it.
191 logging.exception(e)
192 except Exception:
Aviv Keshet7f73f662017-04-27 11:48:45 -0700193 metrics.Counter('chromeos/autotest/scheduler/uncaught_exception'
194 ).increment()
jadmanski0afbb632008-06-06 21:10:57 +0000195
Paul Hobbsabd3b052016-10-03 18:25:23 +0000196 metadata_reporter.abort()
197 email_manager.manager.send_queued_emails()
Paul Hobbsabd3b052016-10-03 18:25:23 +0000198 _drone_manager.shutdown()
199 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000200
201
Prashanth B4ec98672014-05-15 10:44:54 -0700202def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000203 global _shutdown
204 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000205 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000206
207
jamesrenc44ae992010-02-19 00:12:54 +0000208def initialize():
showardb18134f2009-03-20 20:52:18 +0000209 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
210 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000211
showard8de37132009-08-31 18:33:08 +0000212 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000213 logging.critical("monitor_db already running, aborting!")
214 sys.exit(1)
215 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000216
showardb1e51872008-10-07 11:08:18 +0000217 if _testing_mode:
218 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700219 scheduler_lib.DB_CONFIG_SECTION, 'database',
220 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000221
Dan Shib9144a42014-12-01 16:09:32 -0800222 # If server database is enabled, check if the server has role `scheduler`.
223 # If the server does not have scheduler role, exception will be raised and
224 # scheduler will not continue to run.
225 if server_manager_utils.use_server_db():
226 server_manager_utils.confirm_server_has_role(hostname='localhost',
227 role='scheduler')
228
jadmanski0afbb632008-06-06 21:10:57 +0000229 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700230 global _db_manager
231 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700232 global _db
233 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000234 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700235 signal.signal(signal.SIGINT, handle_signal)
236 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000237
jamesrenc44ae992010-02-19 00:12:54 +0000238 initialize_globals()
239 scheduler_models.initialize()
240
Dan Shi114e1722016-01-10 18:12:53 -0800241 drone_list = system_utils.get_drones()
showard170873e2009-01-07 00:22:26 +0000242 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000243 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000244 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
245
showardb18134f2009-03-20 20:52:18 +0000246 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000247
248
jamesrenc44ae992010-02-19 00:12:54 +0000249def initialize_globals():
250 global _drone_manager
251 _drone_manager = drone_manager.instance()
252
253
showarded2afea2009-07-07 20:54:07 +0000254def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
255 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000256 """
257 @returns The autoserv command line as a list of executable + parameters.
258
259 @param machines - string - A machine or comma separated list of machines
260 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000261 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700262 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
263 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000264 @param queue_entry - A HostQueueEntry object - If supplied and no Job
265 object was supplied, this will be used to lookup the Job object.
266 """
Simran Basi1bf60eb2015-12-01 16:39:29 -0800267 command = autoserv_utils.autoserv_run_job_command(_autoserv_directory,
Aviv Keshet308e7362013-05-21 14:43:16 -0700268 machines, results_directory=drone_manager.WORKING_DIRECTORY,
269 extra_args=extra_args, job=job, queue_entry=queue_entry,
Simran Basi1bf60eb2015-12-01 16:39:29 -0800270 verbose=verbose, in_lab=True)
271 return command
showard87ba02a2009-04-20 19:37:32 +0000272
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800273def _calls_log_tick_msg(func):
274 """Used to trace functions called by BaseDispatcher.tick."""
275 @functools.wraps(func)
276 def wrapper(self, *args, **kwargs):
277 self._log_tick_msg('Starting %s' % func.__name__)
278 return func(self, *args, **kwargs)
279
280 return wrapper
281
showard87ba02a2009-04-20 19:37:32 +0000282
Simran Basia858a232012-08-21 11:04:37 -0700283class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800284
285
jadmanski0afbb632008-06-06 21:10:57 +0000286 def __init__(self):
287 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000288 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700289 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000290 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700291 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700292 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Dan Shic458f662015-04-29 12:12:38 -0700293 _db, _drone_manager)
showard170873e2009-01-07 00:22:26 +0000294 self._host_agents = {}
295 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000296 self._tick_count = 0
297 self._last_garbage_stats_time = time.time()
298 self._seconds_between_garbage_stats = 60 * (
299 global_config.global_config.get_config_value(
300 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700301 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700302 self._tick_debug = global_config.global_config.get_config_value(
303 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
304 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700305 self._extra_debugging = global_config.global_config.get_config_value(
306 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
307 default=False)
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800308 self._inline_host_acquisition = (
309 global_config.global_config.get_config_value(
310 scheduler_config.CONFIG_SECTION,
311 'inline_host_acquisition', type=bool, default=True))
mbligh36768f02008-02-22 18:28:33 +0000312
Prashanth Bf66d51b2014-05-06 12:42:25 -0700313 # If _inline_host_acquisition is set the scheduler will acquire and
314 # release hosts against jobs inline, with the tick. Otherwise the
315 # scheduler will only focus on jobs that already have hosts, and
316 # will not explicitly unlease a host when a job finishes using it.
317 self._job_query_manager = query_managers.AFEJobQueryManager()
318 self._host_scheduler = (host_scheduler.BaseHostScheduler()
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800319 if self._inline_host_acquisition else
Prashanth Bf66d51b2014-05-06 12:42:25 -0700320 host_scheduler.DummyHostScheduler())
321
mbligh36768f02008-02-22 18:28:33 +0000322
showard915958d2009-04-22 21:00:58 +0000323 def initialize(self, recover_hosts=True):
324 self._periodic_cleanup.initialize()
325 self._24hr_upkeep.initialize()
Dan Shi55d58992015-05-05 09:10:02 -0700326 # Execute all actions queued in the cleanup tasks. Scheduler tick will
327 # run a refresh task first. If there is any action in the queue, refresh
328 # will raise an exception.
329 _drone_manager.execute_actions()
showard915958d2009-04-22 21:00:58 +0000330
jadmanski0afbb632008-06-06 21:10:57 +0000331 # always recover processes
332 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000333
jadmanski0afbb632008-06-06 21:10:57 +0000334 if recover_hosts:
335 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000336
337
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800338 # TODO(pprabhu) Drop this metric once tick_times has been verified.
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -0800339 @metrics.SecondsTimerDecorator(
340 'chromeos/autotest/scheduler/tick_durations/tick')
jadmanski0afbb632008-06-06 21:10:57 +0000341 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700342 """
343 This is an altered version of tick() where we keep track of when each
344 major step begins so we can try to figure out where we are using most
345 of the tick time.
346 """
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800347 with metrics.RuntimeBreakdownTimer(
348 'chromeos/autotest/scheduler/tick_times') as breakdown_timer:
349 self._log_tick_msg('New tick')
350 system_utils.DroneCache.refresh()
351
352 with breakdown_timer.Step('garbage_collection'):
353 self._garbage_collection()
354 with breakdown_timer.Step('trigger_refresh'):
355 self._log_tick_msg('Starting _drone_manager.trigger_refresh')
356 _drone_manager.trigger_refresh()
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800357 with breakdown_timer.Step('schedule_running_host_queue_entries'):
358 self._schedule_running_host_queue_entries()
359 with breakdown_timer.Step('schedule_special_tasks'):
360 self._schedule_special_tasks()
361 with breakdown_timer.Step('schedule_new_jobs'):
362 self._schedule_new_jobs()
363 with breakdown_timer.Step('sync_refresh'):
364 self._log_tick_msg('Starting _drone_manager.sync_refresh')
365 _drone_manager.sync_refresh()
366 # _run_cleanup must be called between drone_manager.sync_refresh,
367 # and drone_manager.execute_actions, as sync_refresh will clear the
368 # calls queued in drones. Therefore, any action that calls
369 # drone.queue_call to add calls to the drone._calls, should be after
370 # drone refresh is completed and before
371 # drone_manager.execute_actions at the end of the tick.
372 with breakdown_timer.Step('run_cleanup'):
373 self._run_cleanup()
374 with breakdown_timer.Step('find_aborting'):
375 self._find_aborting()
376 with breakdown_timer.Step('find_aborted_special_tasks'):
377 self._find_aborted_special_tasks()
378 with breakdown_timer.Step('handle_agents'):
379 self._handle_agents()
380 with breakdown_timer.Step('host_scheduler_tick'):
381 self._log_tick_msg('Starting _host_scheduler.tick')
382 self._host_scheduler.tick()
383 with breakdown_timer.Step('drones_execute_actions'):
384 self._log_tick_msg('Starting _drone_manager.execute_actions')
385 _drone_manager.execute_actions()
386 with breakdown_timer.Step('send_queued_emails'):
387 self._log_tick_msg(
388 'Starting email_manager.manager.send_queued_emails')
389 email_manager.manager.send_queued_emails()
390 with breakdown_timer.Step('db_reset_queries'):
391 self._log_tick_msg('Starting django.db.reset_queries')
392 django.db.reset_queries()
393
394 self._tick_count += 1
395 metrics.Counter('chromeos/autotest/scheduler/tick').increment()
mbligh36768f02008-02-22 18:28:33 +0000396
showard97aed502008-11-04 02:01:24 +0000397
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800398 @_calls_log_tick_msg
mblighf3294cc2009-04-08 21:17:38 +0000399 def _run_cleanup(self):
400 self._periodic_cleanup.run_cleanup_maybe()
401 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000402
mbligh36768f02008-02-22 18:28:33 +0000403
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800404 @_calls_log_tick_msg
showardf13a9e22009-12-18 22:54:09 +0000405 def _garbage_collection(self):
406 threshold_time = time.time() - self._seconds_between_garbage_stats
407 if threshold_time < self._last_garbage_stats_time:
408 # Don't generate these reports very often.
409 return
410
411 self._last_garbage_stats_time = time.time()
412 # Force a full level 0 collection (because we can, it doesn't hurt
413 # at this interval).
414 gc.collect()
415 logging.info('Logging garbage collector stats on tick %d.',
416 self._tick_count)
417 gc_stats._log_garbage_collector_stats()
418
419
showard170873e2009-01-07 00:22:26 +0000420 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
421 for object_id in object_ids:
422 agent_dict.setdefault(object_id, set()).add(agent)
423
424
425 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
426 for object_id in object_ids:
427 assert object_id in agent_dict
428 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700429 # If an ID has no more active agent associated, there is no need to
430 # keep it in the dictionary. Otherwise, scheduler will keep an
431 # unnecessarily big dictionary until being restarted.
432 if not agent_dict[object_id]:
433 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000434
435
showardd1195652009-12-08 22:21:02 +0000436 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700437 """
438 Creates and adds an agent to the dispatchers list.
439
440 In creating the agent we also pass on all the queue_entry_ids and
441 host_ids from the special agent task. For every agent we create, we
442 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
443 against the host_ids given to it. So theoritically, a host can have any
444 number of agents associated with it, and each of them can have any
445 special agent task, though in practice we never see > 1 agent/task per
446 host at any time.
447
448 @param agent_task: A SpecialTask for the agent to manage.
449 """
showardd1195652009-12-08 22:21:02 +0000450 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000451 self._agents.append(agent)
452 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000453 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
454 self._register_agent_for_ids(self._queue_entry_agents,
455 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000456
showard170873e2009-01-07 00:22:26 +0000457
458 def get_agents_for_entry(self, queue_entry):
459 """
460 Find agents corresponding to the specified queue_entry.
461 """
showardd3dc1992009-04-22 21:01:40 +0000462 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000463
464
465 def host_has_agent(self, host):
466 """
467 Determine if there is currently an Agent present using this host.
468 """
469 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000470
471
jadmanski0afbb632008-06-06 21:10:57 +0000472 def remove_agent(self, agent):
473 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000474 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
475 agent)
476 self._unregister_agent_for_ids(self._queue_entry_agents,
477 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000478
479
showard8cc058f2009-09-08 16:26:33 +0000480 def _host_has_scheduled_special_task(self, host):
481 return bool(models.SpecialTask.objects.filter(host__id=host.id,
482 is_active=False,
483 is_complete=False))
484
485
jadmanski0afbb632008-06-06 21:10:57 +0000486 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000487 agent_tasks = self._create_recovery_agent_tasks()
488 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000489 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000490 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000491 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000492 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000493 self._reverify_remaining_hosts()
494 # reinitialize drones after killing orphaned processes, since they can
495 # leave around files when they die
496 _drone_manager.execute_actions()
497 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000498
showard170873e2009-01-07 00:22:26 +0000499
showardd1195652009-12-08 22:21:02 +0000500 def _create_recovery_agent_tasks(self):
501 return (self._get_queue_entry_agent_tasks()
502 + self._get_special_task_agent_tasks(is_active=True))
503
504
505 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700506 """
507 Get agent tasks for all hqe in the specified states.
508
509 Loosely this translates to taking a hqe in one of the specified states,
510 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
511 through _get_agent_task_for_queue_entry. Each queue entry can only have
512 one agent task at a time, but there might be multiple queue entries in
513 the group.
514
515 @return: A list of AgentTasks.
516 """
showardd1195652009-12-08 22:21:02 +0000517 # host queue entry statuses handled directly by AgentTasks (Verifying is
518 # handled through SpecialTasks, so is not listed here)
519 statuses = (models.HostQueueEntry.Status.STARTING,
520 models.HostQueueEntry.Status.RUNNING,
521 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000522 models.HostQueueEntry.Status.PARSING,
523 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000524 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000525 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000526 where='status IN (%s)' % status_list)
527
528 agent_tasks = []
529 used_queue_entries = set()
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800530 hqe_count_by_status = {}
showardd1195652009-12-08 22:21:02 +0000531 for entry in queue_entries:
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800532 hqe_count_by_status[entry.status] = (
533 hqe_count_by_status.get(entry.status, 0) + 1)
showardd1195652009-12-08 22:21:02 +0000534 if self.get_agents_for_entry(entry):
535 # already being handled
536 continue
537 if entry in used_queue_entries:
538 # already picked up by a synchronous job
539 continue
540 agent_task = self._get_agent_task_for_queue_entry(entry)
541 agent_tasks.append(agent_task)
542 used_queue_entries.update(agent_task.queue_entries)
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800543
544 for status, count in hqe_count_by_status.iteritems():
545 metrics.Gauge(
546 'chromeos/autotest/scheduler/active_host_queue_entries'
547 ).set(count, fields={'status': status})
548
showardd1195652009-12-08 22:21:02 +0000549 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000550
551
showardd1195652009-12-08 22:21:02 +0000552 def _get_special_task_agent_tasks(self, is_active=False):
553 special_tasks = models.SpecialTask.objects.filter(
554 is_active=is_active, is_complete=False)
555 return [self._get_agent_task_for_special_task(task)
556 for task in special_tasks]
557
558
559 def _get_agent_task_for_queue_entry(self, queue_entry):
560 """
beeps8bb1f7d2013-08-05 01:30:09 -0700561 Construct an AgentTask instance for the given active HostQueueEntry.
562
showardd1195652009-12-08 22:21:02 +0000563 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700564 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000565 """
566 task_entries = queue_entry.job.get_group_entries(queue_entry)
567 self._check_for_duplicate_host_entries(task_entries)
568
569 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
570 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000571 if queue_entry.is_hostless():
572 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000573 return QueueTask(queue_entries=task_entries)
574 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700575 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000576 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700577 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000578 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700579 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000580
Prashanth B0e960282014-05-13 19:38:28 -0700581 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800582 '_get_agent_task_for_queue_entry got entry with '
583 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000584
585
586 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000587 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
588 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000589 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000590 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000591 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000592 if using_host:
showardd1195652009-12-08 22:21:02 +0000593 self._assert_host_has_no_agent(task_entry)
594
595
596 def _assert_host_has_no_agent(self, entry):
597 """
598 @param entry: a HostQueueEntry or a SpecialTask
599 """
600 if self.host_has_agent(entry.host):
601 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700602 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000603 'While scheduling %s, host %s already has a host agent %s'
604 % (entry, entry.host, agent.task))
605
606
607 def _get_agent_task_for_special_task(self, special_task):
608 """
609 Construct an AgentTask class to run the given SpecialTask and add it
610 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700611
MK Ryu35d661e2014-09-25 17:44:10 -0700612 A special task is created through schedule_special_tasks, but only if
beeps8bb1f7d2013-08-05 01:30:09 -0700613 the host doesn't already have an agent. This happens through
614 add_agent_task. All special agent tasks are given a host on creation,
615 and a Null hqe. To create a SpecialAgentTask object, you need a
616 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
617 object contains a hqe it's passed on to the special agent task, which
618 creates a HostQueueEntry and saves it as it's queue_entry.
619
showardd1195652009-12-08 22:21:02 +0000620 @param special_task: a models.SpecialTask instance
621 @returns an AgentTask to run this SpecialTask
622 """
623 self._assert_host_has_no_agent(special_task)
624
beeps5e2bb4a2013-10-28 11:26:45 -0700625 special_agent_task_classes = (prejob_task.CleanupTask,
626 prejob_task.VerifyTask,
627 prejob_task.RepairTask,
628 prejob_task.ResetTask,
629 prejob_task.ProvisionTask)
630
showardd1195652009-12-08 22:21:02 +0000631 for agent_task_class in special_agent_task_classes:
632 if agent_task_class.TASK_TYPE == special_task.task:
633 return agent_task_class(task=special_task)
634
Prashanth B0e960282014-05-13 19:38:28 -0700635 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800636 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000637
638
639 def _register_pidfiles(self, agent_tasks):
640 for agent_task in agent_tasks:
641 agent_task.register_necessary_pidfiles()
642
643
644 def _recover_tasks(self, agent_tasks):
645 orphans = _drone_manager.get_orphaned_autoserv_processes()
646
647 for agent_task in agent_tasks:
648 agent_task.recover()
649 if agent_task.monitor and agent_task.monitor.has_process():
650 orphans.discard(agent_task.monitor.get_process())
651 self.add_agent_task(agent_task)
652
653 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000654
655
showard8cc058f2009-09-08 16:26:33 +0000656 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000657 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
658 % status):
showard0db3d432009-10-12 20:29:15 +0000659 if entry.status == status and not self.get_agents_for_entry(entry):
660 # The status can change during iteration, e.g., if job.run()
661 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000662 yield entry
663
664
showard6878e8b2009-07-20 22:37:45 +0000665 def _check_for_remaining_orphan_processes(self, orphans):
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800666 m = 'chromeos/autotest/errors/unrecovered_orphan_processes'
667 metrics.Gauge(m).set(len(orphans))
668
showard6878e8b2009-07-20 22:37:45 +0000669 if not orphans:
670 return
671 subject = 'Unrecovered orphan autoserv processes remain'
672 message = '\n'.join(str(process) for process in orphans)
mbligh5fa9e112009-08-03 16:46:06 +0000673 die_on_orphans = global_config.global_config.get_config_value(
674 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
675
676 if die_on_orphans:
677 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000678
showard170873e2009-01-07 00:22:26 +0000679
showard8cc058f2009-09-08 16:26:33 +0000680 def _recover_pending_entries(self):
681 for entry in self._get_unassigned_entries(
682 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000683 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000684 entry.on_pending()
685
686
showardb8900452009-10-12 20:31:01 +0000687 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000688 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000689 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
690 unrecovered_hqes = []
691 for queue_entry in queue_entries:
692 special_tasks = models.SpecialTask.objects.filter(
693 task__in=(models.SpecialTask.Task.CLEANUP,
694 models.SpecialTask.Task.VERIFY),
695 queue_entry__id=queue_entry.id,
696 is_complete=False)
697 if special_tasks.count() == 0:
698 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000699
showardb8900452009-10-12 20:31:01 +0000700 if unrecovered_hqes:
701 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700702 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000703 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000704 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000705
706
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800707 @_calls_log_tick_msg
showard65db3932009-10-28 19:54:35 +0000708 def _schedule_special_tasks(self):
709 """
710 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700711
712 Special tasks include PreJobTasks like verify, reset and cleanup.
713 They are created through _schedule_new_jobs and associated with a hqe
714 This method translates SpecialTasks to the appropriate AgentTask and
715 adds them to the dispatchers agents list, so _handle_agents can execute
716 them.
showard65db3932009-10-28 19:54:35 +0000717 """
Prashanth B4ec98672014-05-15 10:44:54 -0700718 # When the host scheduler is responsible for acquisition we only want
719 # to run tasks with leased hosts. All hqe tasks will already have
720 # leased hosts, and we don't want to run frontend tasks till the host
721 # scheduler has vetted the assignment. Note that this doesn't include
722 # frontend tasks with hosts leased by other active hqes.
723 for task in self._job_query_manager.get_prioritized_special_tasks(
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800724 only_tasks_with_leased_hosts=not self._inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000725 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000726 continue
showardd1195652009-12-08 22:21:02 +0000727 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000728
729
showard170873e2009-01-07 00:22:26 +0000730 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000731 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000732 # should never happen
showarded2afea2009-07-07 20:54:07 +0000733 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000734 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000735 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700736 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000737 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000738
739
jadmanski0afbb632008-06-06 21:10:57 +0000740 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000741 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700742 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000743 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000744 if self.host_has_agent(host):
745 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000746 continue
showard8cc058f2009-09-08 16:26:33 +0000747 if self._host_has_scheduled_special_task(host):
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700748 # host will have a special task scheduled on the next tick
showard8cc058f2009-09-08 16:26:33 +0000749 continue
showard170873e2009-01-07 00:22:26 +0000750 if print_message:
showardb18134f2009-03-20 20:52:18 +0000751 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000752 models.SpecialTask.objects.create(
753 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000754 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000755
756
jadmanski0afbb632008-06-06 21:10:57 +0000757 def _recover_hosts(self):
758 # recover "Repair Failed" hosts
759 message = 'Reverifying dead host %s'
760 self._reverify_hosts_where("status = 'Repair Failed'",
761 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000762
763
showard89f84db2009-03-12 20:39:13 +0000764 def _refresh_pending_queue_entries(self):
765 """
766 Lookup the pending HostQueueEntries and call our HostScheduler
767 refresh() method given that list. Return the list.
768
769 @returns A list of pending HostQueueEntries sorted in priority order.
770 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700771 queue_entries = self._job_query_manager.get_pending_queue_entries(
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800772 only_hostless=not self._inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000773 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000774 return []
showard89f84db2009-03-12 20:39:13 +0000775 return queue_entries
776
777
showarda9545c02009-12-18 22:44:26 +0000778 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800779 """Schedule a hostless (suite) job.
780
781 @param queue_entry: The queue_entry representing the hostless job.
782 """
showarda9545c02009-12-18 22:44:26 +0000783 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700784
785 # Need to set execution_subdir before setting the status:
786 # After a restart of the scheduler, agents will be restored for HQEs in
787 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
788 # execution_subdir is needed. Therefore it must be set before entering
789 # one of these states.
790 # Otherwise, if the scheduler was interrupted between setting the status
791 # and the execution_subdir, upon it's restart restoring agents would
792 # fail.
793 # Is there a way to get a status in one of these states without going
794 # through this code? Following cases are possible:
795 # - If it's aborted before being started:
796 # active bit will be 0, so there's nothing to parse, it will just be
797 # set to completed by _find_aborting. Critical statuses are skipped.
798 # - If it's aborted or it fails after being started:
799 # It was started, so this code was executed.
800 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000801 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000802
803
beepscc9fc702013-12-02 12:45:38 -0800804 def _schedule_host_job(self, host, queue_entry):
805 """Schedules a job on the given host.
806
807 1. Assign the host to the hqe, if it isn't already assigned.
808 2. Create a SpecialAgentTask for the hqe.
809 3. Activate the hqe.
810
811 @param queue_entry: The job to schedule.
812 @param host: The host to schedule the job on.
813 """
814 if self.host_has_agent(host):
815 host_agent_task = list(self._host_agents.get(host.id))[0].task
beepscc9fc702013-12-02 12:45:38 -0800816 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700817 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800818
819
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800820 @_calls_log_tick_msg
showard89f84db2009-03-12 20:39:13 +0000821 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700822 """
823 Find any new HQEs and call schedule_pre_job_tasks for it.
824
825 This involves setting the status of the HQE and creating a row in the
826 db corresponding the the special task, through
827 scheduler_models._queue_special_task. The new db row is then added as
828 an agent to the dispatcher through _schedule_special_tasks and
829 scheduled for execution on the drone through _handle_agents.
830 """
showard89f84db2009-03-12 20:39:13 +0000831 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000832
beepscc9fc702013-12-02 12:45:38 -0800833 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700834 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700835 new_jobs_with_hosts = 0
836 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800837 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700838 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000839
beepscc9fc702013-12-02 12:45:38 -0800840 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000841 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000842 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700843 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000844 else:
beepscc9fc702013-12-02 12:45:38 -0800845 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700846 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700847
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800848 metrics.Counter(
849 'chromeos/autotest/scheduler/scheduled_jobs_hostless'
850 ).increment_by(new_hostless_jobs)
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800851
beepscc9fc702013-12-02 12:45:38 -0800852 if not host_jobs:
853 return
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800854
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800855 if not self._inline_host_acquisition:
Prathmesh Prabhu3c6a3bf2016-12-20 11:29:02 -0800856 # In this case, host_scheduler is responsible for scheduling
857 # host_jobs. Scheduling the jobs ourselves can lead to DB corruption
858 # since host_scheduler assumes it is the single process scheduling
859 # host jobs.
860 metrics.Gauge(
861 'chromeos/autotest/errors/scheduler/unexpected_host_jobs').set(
862 len(host_jobs))
863 return
864
Prashanth Bf66d51b2014-05-06 12:42:25 -0700865 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
866 for host_assignment in jobs_with_hosts:
867 self._schedule_host_job(host_assignment.host, host_assignment.job)
868 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800869
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800870 metrics.Counter(
871 'chromeos/autotest/scheduler/scheduled_jobs_with_hosts'
872 ).increment_by(new_jobs_with_hosts)
873 # TODO(pprabhu): Decide what to do about this metric. Million dollar
874 # question: What happens to jobs that were not matched. Do they stay in
875 # the queue, and get processed right here in the next tick (then we want
876 # a guage corresponding to the number of outstanding unmatched host
877 # jobs), or are they handled somewhere else (then we need a counter
878 # corresponding to failed_to_match_with_hosts jobs).
879 #autotest_stats.Gauge(key).send('new_jobs_without_hosts',
880 # new_jobs_need_hosts -
881 # new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000882
883
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800884 @_calls_log_tick_msg
showard8cc058f2009-09-08 16:26:33 +0000885 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700886 """
887 Adds agents to the dispatcher.
888
889 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
890 QueueTask for example, will have a job with a control file, and
891 the agent will have methods that poll, abort and check if the queue
892 task is finished. The dispatcher runs the agent_task, as well as
893 other agents in it's _agents member, through _handle_agents, by
894 calling the Agents tick().
895
896 This method creates an agent for each HQE in one of (starting, running,
897 gathering, parsing, archiving) states, and adds it to the dispatcher so
898 it is handled by _handle_agents.
899 """
showardd1195652009-12-08 22:21:02 +0000900 for agent_task in self._get_queue_entry_agent_tasks():
901 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000902
903
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800904 @_calls_log_tick_msg
jadmanski0afbb632008-06-06 21:10:57 +0000905 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700906 """
907 Looks through the afe_host_queue_entries for an aborted entry.
908
909 The aborted bit is set on an HQE in many ways, the most common
910 being when a user requests an abort through the frontend, which
911 results in an rpc from the afe to abort_host_queue_entries.
912 """
jamesrene7c65cb2010-06-08 20:38:10 +0000913 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000914 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700915 where='aborted=1 and complete=0'):
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800916
917 # If the job is running on a shard, let the shard handle aborting
918 # it and sync back the right status.
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800919 if entry.job.shard_id is not None and not server_utils.is_shard():
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800920 logging.info('Waiting for shard %s to abort hqe %s',
921 entry.job.shard_id, entry)
922 continue
923
showardf4a2e502009-07-28 20:06:39 +0000924 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800925
926 # The task would have started off with both is_complete and
927 # is_active = False. Aborted tasks are neither active nor complete.
928 # For all currently active tasks this will happen through the agent,
929 # but we need to manually update the special tasks that haven't
930 # started yet, because they don't have agents.
931 models.SpecialTask.objects.filter(is_active=False,
932 queue_entry_id=entry.id).update(is_complete=True)
933
showardd3dc1992009-04-22 21:01:40 +0000934 for agent in self.get_agents_for_entry(entry):
935 agent.abort()
936 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000937 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700938 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000939 for job in jobs_to_stop:
940 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000941
942
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800943 @_calls_log_tick_msg
beeps8bb1f7d2013-08-05 01:30:09 -0700944 def _find_aborted_special_tasks(self):
945 """
946 Find SpecialTasks that have been marked for abortion.
947
948 Poll the database looking for SpecialTasks that are active
949 and have been marked for abortion, then abort them.
950 """
951
952 # The completed and active bits are very important when it comes
953 # to scheduler correctness. The active bit is set through the prolog
954 # of a special task, and reset through the cleanup method of the
955 # SpecialAgentTask. The cleanup is called both through the abort and
956 # epilog. The complete bit is set in several places, and in general
957 # a hanging job will have is_active=1 is_complete=0, while a special
958 # task which completed will have is_active=0 is_complete=1. To check
959 # aborts we directly check active because the complete bit is set in
960 # several places, including the epilog of agent tasks.
961 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
962 is_aborted=True)
963 for task in aborted_tasks:
964 # There are 2 ways to get the agent associated with a task,
965 # through the host and through the hqe. A special task
966 # always needs a host, but doesn't always need a hqe.
967 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700968 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000969
beeps8bb1f7d2013-08-05 01:30:09 -0700970 # The epilog preforms critical actions such as
971 # queueing the next SpecialTask, requeuing the
972 # hqe etc, however it doesn't actually kill the
973 # monitor process and set the 'done' bit. Epilogs
974 # assume that the job failed, and that the monitor
975 # process has already written an exit code. The
976 # done bit is a necessary condition for
977 # _handle_agents to schedule any more special
978 # tasks against the host, and it must be set
979 # in addition to is_active, is_complete and success.
980 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000981 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700982
983
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700984 def _can_start_agent(self, agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000985 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000986 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000987 return True
988 # don't allow any nonzero-process agents to run after we've reached a
989 # limit (this avoids starvation of many-process agents)
990 if have_reached_limit:
991 return False
992 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000993 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000994 agent.task.owner_username,
995 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000996 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000997 return False
showard4c5374f2008-09-04 17:02:56 +0000998 return True
999
1000
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -08001001 @_calls_log_tick_msg
jadmanski0afbb632008-06-06 21:10:57 +00001002 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -07001003 """
1004 Handles agents of the dispatcher.
1005
1006 Appropriate Agents are added to the dispatcher through
1007 _schedule_running_host_queue_entries. These agents each
1008 have a task. This method runs the agents task through
1009 agent.tick() leading to:
1010 agent.start
1011 prolog -> AgentTasks prolog
1012 For each queue entry:
1013 sets host status/status to Running
1014 set started_on in afe_host_queue_entries
1015 run -> AgentTasks run
1016 Creates PidfileRunMonitor
1017 Queues the autoserv command line for this AgentTask
1018 via the drone manager. These commands are executed
1019 through the drone managers execute actions.
1020 poll -> AgentTasks/BaseAgentTask poll
1021 checks the monitors exit_code.
1022 Executes epilog if task is finished.
1023 Executes AgentTasks _finish_task
1024 finish_task is usually responsible for setting the status
1025 of the HQE/host, and updating it's active and complete fileds.
1026
1027 agent.is_done
1028 Removed the agent from the dispatchers _agents queue.
1029 Is_done checks the finished bit on the agent, that is
1030 set based on the Agents task. During the agents poll
1031 we check to see if the monitor process has exited in
1032 it's finish method, and set the success member of the
1033 task based on this exit code.
1034 """
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001035 num_started_this_tick = 0
1036 num_finished_this_tick = 0
showard4c5374f2008-09-04 17:02:56 +00001037 have_reached_limit = False
1038 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -07001039 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +00001040 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001041 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1042 'queue_entry ids:%s' % (agent.host_ids,
1043 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001044 if not agent.started:
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001045 if not self._can_start_agent(agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001046 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001047 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001048 continue
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001049 num_started_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001050 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001051 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001052 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001053 if agent.is_done():
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001054 num_finished_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001055 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001056 self.remove_agent(agent)
Prathmesh Prabhued7ece92016-11-23 11:19:43 -08001057
1058 metrics.Counter(
1059 'chromeos/autotest/scheduler/agent_processes_started'
1060 ).increment_by(num_started_this_tick)
1061 metrics.Counter(
1062 'chromeos/autotest/scheduler/agent_processes_finished'
1063 ).increment_by(num_finished_this_tick)
1064 num_agent_processes = _drone_manager.total_running_processes()
1065 metrics.Gauge(
1066 'chromeos/autotest/scheduler/agent_processes'
1067 ).set(num_agent_processes)
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001068 logging.info('%d running processes. %d added this tick.',
Prathmesh Prabhued7ece92016-11-23 11:19:43 -08001069 num_agent_processes, num_started_this_tick)
mbligh36768f02008-02-22 18:28:33 +00001070
1071
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -08001072 def _log_tick_msg(self, msg):
1073 if self._tick_debug:
1074 logging.debug(msg)
1075
1076
1077 def _log_extra_msg(self, msg):
1078 if self._extra_debugging:
1079 logging.debug(msg)
1080
1081
Simran Basia858a232012-08-21 11:04:37 -07001082SiteDispatcher = utils.import_site_class(
1083 __file__, 'autotest_lib.scheduler.site_monitor_db',
1084 'SiteDispatcher', BaseDispatcher)
1085
1086class Dispatcher(SiteDispatcher):
1087 pass
1088
1089
mbligh36768f02008-02-22 18:28:33 +00001090class Agent(object):
showard77182562009-06-10 00:16:05 +00001091 """
Alex Miller47715eb2013-07-24 03:34:01 -07001092 An agent for use by the Dispatcher class to perform a task. An agent wraps
1093 around an AgentTask mainly to associate the AgentTask with the queue_entry
1094 and host ids.
showard77182562009-06-10 00:16:05 +00001095
1096 The following methods are required on all task objects:
1097 poll() - Called periodically to let the task check its status and
1098 update its internal state. If the task succeeded.
1099 is_done() - Returns True if the task is finished.
1100 abort() - Called when an abort has been requested. The task must
1101 set its aborted attribute to True if it actually aborted.
1102
1103 The following attributes are required on all task objects:
1104 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001105 success - bool, True if this task succeeded.
1106 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1107 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001108 """
1109
1110
showard418785b2009-11-23 20:19:59 +00001111 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001112 """
Alex Miller47715eb2013-07-24 03:34:01 -07001113 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001114 """
showard8cc058f2009-09-08 16:26:33 +00001115 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001116
showard77182562009-06-10 00:16:05 +00001117 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001118 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001119
showard8cc058f2009-09-08 16:26:33 +00001120 self.queue_entry_ids = task.queue_entry_ids
1121 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001122
showard8cc058f2009-09-08 16:26:33 +00001123 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001124 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001125
1126
jadmanski0afbb632008-06-06 21:10:57 +00001127 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001128 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001129 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001130 self.task.poll()
1131 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001132 self.finished = True
showardec113162008-05-08 00:52:49 +00001133
1134
jadmanski0afbb632008-06-06 21:10:57 +00001135 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001136 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001137
1138
showardd3dc1992009-04-22 21:01:40 +00001139 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001140 if self.task:
1141 self.task.abort()
1142 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001143 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001144 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001145
showardd3dc1992009-04-22 21:01:40 +00001146
beeps5e2bb4a2013-10-28 11:26:45 -07001147class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001148 """
1149 Common functionality for QueueTask and HostlessQueueTask
1150 """
1151 def __init__(self, queue_entries):
1152 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001153 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001154 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001155
1156
showard73ec0442009-02-07 02:05:20 +00001157 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001158 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001159
1160
jamesrenc44ae992010-02-19 00:12:54 +00001161 def _write_control_file(self, execution_path):
1162 control_path = _drone_manager.attach_file_to_execution(
1163 execution_path, self.job.control_file)
1164 return control_path
1165
1166
Aviv Keshet308e7362013-05-21 14:43:16 -07001167 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001168 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001169 execution_path = self.queue_entries[0].execution_path()
1170 control_path = self._write_control_file(execution_path)
1171 hostnames = ','.join(entry.host.hostname
1172 for entry in self.queue_entries
1173 if not entry.is_hostless())
1174
1175 execution_tag = self.queue_entries[0].execution_tag()
1176 params = _autoserv_command_line(
1177 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001178 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001179 _drone_manager.absolute_path(control_path)],
1180 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001181 if self.job.is_image_update_job():
1182 params += ['--image', self.job.update_image_path]
1183
jamesrenc44ae992010-02-19 00:12:54 +00001184 return params
showardd1195652009-12-08 22:21:02 +00001185
1186
1187 @property
1188 def num_processes(self):
1189 return len(self.queue_entries)
1190
1191
1192 @property
1193 def owner_username(self):
1194 return self.job.owner
1195
1196
1197 def _working_directory(self):
1198 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001199
1200
jadmanski0afbb632008-06-06 21:10:57 +00001201 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001202 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001203 keyval_dict = self.job.keyval_dict()
1204 keyval_dict[queued_key] = queued_time
showardf1ae3542009-05-11 19:26:02 +00001205 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001206 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001207 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001208 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001209
1210
showard35162b02009-03-03 02:17:30 +00001211 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001212 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001213 _drone_manager.write_lines_to_file(error_file_path,
1214 [_LOST_PROCESS_ERROR])
1215
1216
showardd3dc1992009-04-22 21:01:40 +00001217 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001218 if not self.monitor:
1219 return
1220
showardd9205182009-04-27 20:09:55 +00001221 self._write_job_finished()
1222
showard35162b02009-03-03 02:17:30 +00001223 if self.monitor.lost_process:
1224 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001225
jadmanskif7fa2cc2008-10-01 14:13:23 +00001226
showardcbd74612008-11-19 21:42:02 +00001227 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001228 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001229 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001230 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001231 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001232
1233
jadmanskif7fa2cc2008-10-01 14:13:23 +00001234 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001235 if not self.monitor or not self.monitor.has_process():
1236 return
1237
jadmanskif7fa2cc2008-10-01 14:13:23 +00001238 # build up sets of all the aborted_by and aborted_on values
1239 aborted_by, aborted_on = set(), set()
1240 for queue_entry in self.queue_entries:
1241 if queue_entry.aborted_by:
1242 aborted_by.add(queue_entry.aborted_by)
1243 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1244 aborted_on.add(t)
1245
1246 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001247 # TODO(showard): this conditional is now obsolete, we just need to leave
1248 # it in temporarily for backwards compatibility over upgrades. delete
1249 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001250 assert len(aborted_by) <= 1
1251 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001252 aborted_by_value = aborted_by.pop()
1253 aborted_on_value = max(aborted_on)
1254 else:
1255 aborted_by_value = 'autotest_system'
1256 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001257
showarda0382352009-02-11 23:36:43 +00001258 self._write_keyval_after_job("aborted_by", aborted_by_value)
1259 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001260
showardcbd74612008-11-19 21:42:02 +00001261 aborted_on_string = str(datetime.datetime.fromtimestamp(
1262 aborted_on_value))
1263 self._write_status_comment('Job aborted by %s on %s' %
1264 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001265
1266
jadmanski0afbb632008-06-06 21:10:57 +00001267 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001268 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001269 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001270 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001271
1272
jadmanski0afbb632008-06-06 21:10:57 +00001273 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001274 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001275 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001276
1277
1278class QueueTask(AbstractQueueTask):
1279 def __init__(self, queue_entries):
1280 super(QueueTask, self).__init__(queue_entries)
1281 self._set_ids(queue_entries=queue_entries)
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -08001282 self._enable_ssp_container = (
1283 global_config.global_config.get_config_value(
1284 'AUTOSERV', 'enable_ssp_container', type=bool,
1285 default=True))
showarda9545c02009-12-18 22:44:26 +00001286
1287
1288 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001289 self._check_queue_entry_statuses(
1290 self.queue_entries,
1291 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1292 models.HostQueueEntry.Status.RUNNING),
1293 allowed_host_statuses=(models.Host.Status.PENDING,
1294 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001295
1296 super(QueueTask, self).prolog()
1297
1298 for queue_entry in self.queue_entries:
1299 self._write_host_keyvals(queue_entry.host)
1300 queue_entry.host.set_status(models.Host.Status.RUNNING)
1301 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001302
1303
1304 def _finish_task(self):
1305 super(QueueTask, self)._finish_task()
1306
1307 for queue_entry in self.queue_entries:
1308 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001309 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001310
1311
Alex Miller9f01d5d2013-08-08 02:26:01 -07001312 def _command_line(self):
Dan Shi36cfd832014-10-10 13:38:51 -07001313 invocation = super(QueueTask, self)._command_line()
1314 # Check if server-side packaging is needed.
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -08001315 if (self._enable_ssp_container and
Dan Shi36cfd832014-10-10 13:38:51 -07001316 self.job.control_type == control_data.CONTROL_TYPE.SERVER and
1317 self.job.require_ssp != False):
Dan Shiec1d47d2015-02-13 11:38:13 -08001318 invocation += ['--require-ssp']
Dan Shi36cfd832014-10-10 13:38:51 -07001319 keyval_dict = self.job.keyval_dict()
1320 test_source_build = keyval_dict.get('test_source_build', None)
1321 if test_source_build:
1322 invocation += ['--test_source_build', test_source_build]
Dan Shi70647ca2015-07-16 22:52:35 -07001323 if self.job.parent_job_id:
1324 invocation += ['--parent_job_id', str(self.job.parent_job_id)]
Dan Shi36cfd832014-10-10 13:38:51 -07001325 return invocation + ['--verify_job_repo_url']
Alex Miller9f01d5d2013-08-08 02:26:01 -07001326
1327
Dan Shi1a189052013-10-28 14:41:35 -07001328class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001329 def __init__(self, queue_entry):
1330 super(HostlessQueueTask, self).__init__([queue_entry])
1331 self.queue_entry_ids = [queue_entry.id]
1332
1333
1334 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001335 super(HostlessQueueTask, self).prolog()
1336
1337
mbligh4608b002010-01-05 18:22:35 +00001338 def _finish_task(self):
1339 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001340
1341 # When a job is added to database, its initial status is always
1342 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1343 # status, check if any of them can be started. If scheduler hits some
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001344 # limit, e.g., max_hostless_jobs_per_drone, scheduler will
1345 # leave these jobs in Starting status. Otherwise, the jobs'
1346 # status will be changed to Running, and an autoserv process
1347 # will be started in drone for each of these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001348 # If the entry is still in status Starting, the process has not started
1349 # yet. Therefore, there is no need to parse and collect log. Without
1350 # this check, exception will be raised by scheduler as execution_subdir
1351 # for this queue entry does not have a value yet.
1352 hqe = self.queue_entries[0]
1353 if hqe.status != models.HostQueueEntry.Status.STARTING:
1354 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001355
1356
mbligh36768f02008-02-22 18:28:33 +00001357if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001358 main()