blob: 45071e8c2f3ebbc28e7aef1d0240f714f385326f [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
Richard Barnetteffed1722016-05-18 15:57:22 -07002
3#pylint: disable=C0111
4
mbligh36768f02008-02-22 18:28:33 +00005"""
6Autotest scheduler
7"""
showard909c7a62008-07-15 21:52:38 +00008
Dan Shif6c65bd2014-08-29 16:15:07 -07009import datetime
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -080010import functools
Dan Shif6c65bd2014-08-29 16:15:07 -070011import gc
12import logging
13import optparse
14import os
15import signal
16import sys
17import time
showard402934a2009-12-21 22:20:47 +000018
Alex Miller05d7b4c2013-03-04 07:49:38 -080019import common
showard21baa452008-10-21 00:08:39 +000020from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000021
22import django.db
Aviv Keshet65fed072016-06-29 10:20:55 -070023from chromite.lib import metrics
Richard Barnetteffed1722016-05-18 15:57:22 -070024from chromite.lib import ts_mon_config
showard402934a2009-12-21 22:20:47 +000025
Dan Shiec1d47d2015-02-13 11:38:13 -080026from autotest_lib.client.common_lib import control_data
Prashanth B0e960282014-05-13 19:38:28 -070027from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070028from autotest_lib.client.common_lib import utils
Prashanth B0e960282014-05-13 19:38:28 -070029from autotest_lib.frontend.afe import models, rpc_utils
Fang Dengc330bee2014-10-21 18:10:55 -070030from autotest_lib.scheduler import agent_task, drone_manager
beeps5e2bb4a2013-10-28 11:26:45 -070031from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
32from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070033from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070034from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070035from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000036from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080037from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070038from autotest_lib.server import autoserv_utils
Dan Shi114e1722016-01-10 18:12:53 -080039from autotest_lib.server import system_utils
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080040from autotest_lib.server import utils as server_utils
Dan Shicf2e8dd2015-05-07 17:18:48 -070041from autotest_lib.site_utils import metadata_reporter
Dan Shib9144a42014-12-01 16:09:32 -080042from autotest_lib.site_utils import server_manager_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080043
Dan Shicf2e8dd2015-05-07 17:18:48 -070044
showard549afad2009-08-20 23:33:36 +000045BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
46PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000047
mbligh36768f02008-02-22 18:28:33 +000048RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000049AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
50
51if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000052 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000053AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
54AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
55
56if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000057 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000058
showard35162b02009-03-03 02:17:30 +000059# error message to leave in results dir when an autoserv process disappears
60# mysteriously
61_LOST_PROCESS_ERROR = """\
62Autoserv failed abnormally during execution for this job, probably due to a
63system error on the Autotest server. Full results may not be available. Sorry.
64"""
65
Prashanth B0e960282014-05-13 19:38:28 -070066_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070067_db = None
mbligh36768f02008-02-22 18:28:33 +000068_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070069
70# These 2 globals are replaced for testing
71_autoserv_directory = autoserv_utils.autoserv_directory
72_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000073_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000074_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070075_inline_host_acquisition = global_config.global_config.get_config_value(
76 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
77 default=True)
78
Dan Shiec1d47d2015-02-13 11:38:13 -080079_enable_ssp_container = global_config.global_config.get_config_value(
80 'AUTOSERV', 'enable_ssp_container', type=bool,
81 default=True)
mbligh36768f02008-02-22 18:28:33 +000082
mbligh83c1e9e2009-05-01 23:10:41 +000083def _site_init_monitor_db_dummy():
84 return {}
85
86
jamesren76fcf192010-04-21 20:39:50 +000087def _verify_default_drone_set_exists():
88 if (models.DroneSet.drone_sets_enabled() and
89 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070090 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080091 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000092
93
94def _sanity_check():
95 """Make sure the configs are consistent before starting the scheduler"""
96 _verify_default_drone_set_exists()
97
98
mbligh36768f02008-02-22 18:28:33 +000099def main():
showard27f33872009-04-07 18:20:53 +0000100 try:
showard549afad2009-08-20 23:33:36 +0000101 try:
102 main_without_exception_handling()
103 except SystemExit:
104 raise
105 except:
106 logging.exception('Exception escaping in monitor_db')
107 raise
108 finally:
109 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000110
111
112def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700113 scheduler_lib.setup_logging(
114 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
115 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000116 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000117 parser = optparse.OptionParser(usage)
118 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
119 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000120 parser.add_option('--test', help='Indicate that scheduler is under ' +
121 'test and should use dummy autoserv and no parsing',
122 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700123 parser.add_option('--production',
124 help=('Indicate that scheduler is running in production '
125 'environment and it can use database that is not '
126 'hosted in localhost. If it is set to False, '
127 'scheduler will fail if database is not in '
128 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700129 action='store_true', default=False)
jadmanski0afbb632008-06-06 21:10:57 +0000130 (options, args) = parser.parse_args()
131 if len(args) != 1:
132 parser.print_usage()
133 return
mbligh36768f02008-02-22 18:28:33 +0000134
Dan Shif6c65bd2014-08-29 16:15:07 -0700135 scheduler_lib.check_production_settings(options)
136
showard5613c662009-06-08 23:30:33 +0000137 scheduler_enabled = global_config.global_config.get_config_value(
138 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
139
140 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800141 logging.error("Scheduler not enabled, set enable_scheduler to true in "
142 "the global_config's SCHEDULER section to enable it. "
143 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000144 sys.exit(1)
145
jadmanski0afbb632008-06-06 21:10:57 +0000146 global RESULTS_DIR
147 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000148
mbligh83c1e9e2009-05-01 23:10:41 +0000149 site_init = utils.import_site_function(__file__,
150 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
151 _site_init_monitor_db_dummy)
152 site_init()
153
showardcca334f2009-03-12 20:38:34 +0000154 # Change the cwd while running to avoid issues incase we were launched from
155 # somewhere odd (such as a random NFS home directory of the person running
156 # sudo to launch us as the appropriate user).
157 os.chdir(RESULTS_DIR)
158
jamesrenc7d387e2010-08-10 21:48:30 +0000159 # This is helpful for debugging why stuff a scheduler launches is
160 # misbehaving.
161 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000162
jadmanski0afbb632008-06-06 21:10:57 +0000163 if options.test:
164 global _autoserv_path
165 _autoserv_path = 'autoserv_dummy'
166 global _testing_mode
167 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000168
jamesrenc44ae992010-02-19 00:12:54 +0000169 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000170 server.start()
171
Dan Shicf2e8dd2015-05-07 17:18:48 -0700172 # Start the thread to report metadata.
173 metadata_reporter.start()
174
Paul Hobbseedcb8b2016-10-05 16:44:27 -0700175 with ts_mon_config.SetupTsMonGlobalState('autotest_scheduler',
176 indirect=True):
177 try:
178 initialize()
179 dispatcher = Dispatcher()
180 dispatcher.initialize(recover_hosts=options.recover_hosts)
181 minimum_tick_sec = global_config.global_config.get_config_value(
182 scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
Richard Barnetteffed1722016-05-18 15:57:22 -0700183
Paul Hobbseedcb8b2016-10-05 16:44:27 -0700184 while not _shutdown and not server._shutdown_scheduler:
185 start = time.time()
186 dispatcher.tick()
187 curr_tick_sec = time.time() - start
188 if minimum_tick_sec > curr_tick_sec:
189 time.sleep(minimum_tick_sec - curr_tick_sec)
190 else:
191 time.sleep(0.0001)
192 except server_manager_utils.ServerActionError as e:
193 # This error is expected when the server is not in primary status
194 # for scheduler role. Thus do not send email for it.
195 logging.exception(e)
196 except Exception:
197 email_manager.manager.log_stacktrace(
198 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000199
Paul Hobbsabd3b052016-10-03 18:25:23 +0000200 metadata_reporter.abort()
201 email_manager.manager.send_queued_emails()
202 server.shutdown()
203 _drone_manager.shutdown()
204 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000205
206
Prashanth B4ec98672014-05-15 10:44:54 -0700207def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000208 global _shutdown
209 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000210 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000211
212
jamesrenc44ae992010-02-19 00:12:54 +0000213def initialize():
showardb18134f2009-03-20 20:52:18 +0000214 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
215 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000216
showard8de37132009-08-31 18:33:08 +0000217 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000218 logging.critical("monitor_db already running, aborting!")
219 sys.exit(1)
220 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000221
showardb1e51872008-10-07 11:08:18 +0000222 if _testing_mode:
223 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700224 scheduler_lib.DB_CONFIG_SECTION, 'database',
225 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000226
Dan Shib9144a42014-12-01 16:09:32 -0800227 # If server database is enabled, check if the server has role `scheduler`.
228 # If the server does not have scheduler role, exception will be raised and
229 # scheduler will not continue to run.
230 if server_manager_utils.use_server_db():
231 server_manager_utils.confirm_server_has_role(hostname='localhost',
232 role='scheduler')
233
jadmanski0afbb632008-06-06 21:10:57 +0000234 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700235 global _db_manager
236 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700237 global _db
238 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000239 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700240 signal.signal(signal.SIGINT, handle_signal)
241 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000242
jamesrenc44ae992010-02-19 00:12:54 +0000243 initialize_globals()
244 scheduler_models.initialize()
245
Dan Shi114e1722016-01-10 18:12:53 -0800246 drone_list = system_utils.get_drones()
showard170873e2009-01-07 00:22:26 +0000247 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000248 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000249 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
250
showardb18134f2009-03-20 20:52:18 +0000251 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000252
253
jamesrenc44ae992010-02-19 00:12:54 +0000254def initialize_globals():
255 global _drone_manager
256 _drone_manager = drone_manager.instance()
257
258
showarded2afea2009-07-07 20:54:07 +0000259def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
260 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000261 """
262 @returns The autoserv command line as a list of executable + parameters.
263
264 @param machines - string - A machine or comma separated list of machines
265 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000266 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700267 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
268 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000269 @param queue_entry - A HostQueueEntry object - If supplied and no Job
270 object was supplied, this will be used to lookup the Job object.
271 """
Simran Basi1bf60eb2015-12-01 16:39:29 -0800272 command = autoserv_utils.autoserv_run_job_command(_autoserv_directory,
Aviv Keshet308e7362013-05-21 14:43:16 -0700273 machines, results_directory=drone_manager.WORKING_DIRECTORY,
274 extra_args=extra_args, job=job, queue_entry=queue_entry,
Simran Basi1bf60eb2015-12-01 16:39:29 -0800275 verbose=verbose, in_lab=True)
276 return command
showard87ba02a2009-04-20 19:37:32 +0000277
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800278def _calls_log_tick_msg(func):
279 """Used to trace functions called by BaseDispatcher.tick."""
280 @functools.wraps(func)
281 def wrapper(self, *args, **kwargs):
282 self._log_tick_msg('Starting %s' % func.__name__)
283 return func(self, *args, **kwargs)
284
285 return wrapper
286
showard87ba02a2009-04-20 19:37:32 +0000287
Simran Basia858a232012-08-21 11:04:37 -0700288class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800289
290
jadmanski0afbb632008-06-06 21:10:57 +0000291 def __init__(self):
292 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000293 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700294 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000295 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700296 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700297 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Dan Shic458f662015-04-29 12:12:38 -0700298 _db, _drone_manager)
showard170873e2009-01-07 00:22:26 +0000299 self._host_agents = {}
300 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000301 self._tick_count = 0
302 self._last_garbage_stats_time = time.time()
303 self._seconds_between_garbage_stats = 60 * (
304 global_config.global_config.get_config_value(
305 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700306 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700307 self._tick_debug = global_config.global_config.get_config_value(
308 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
309 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700310 self._extra_debugging = global_config.global_config.get_config_value(
311 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
312 default=False)
mbligh36768f02008-02-22 18:28:33 +0000313
Prashanth Bf66d51b2014-05-06 12:42:25 -0700314 # If _inline_host_acquisition is set the scheduler will acquire and
315 # release hosts against jobs inline, with the tick. Otherwise the
316 # scheduler will only focus on jobs that already have hosts, and
317 # will not explicitly unlease a host when a job finishes using it.
318 self._job_query_manager = query_managers.AFEJobQueryManager()
319 self._host_scheduler = (host_scheduler.BaseHostScheduler()
320 if _inline_host_acquisition else
321 host_scheduler.DummyHostScheduler())
322
mbligh36768f02008-02-22 18:28:33 +0000323
showard915958d2009-04-22 21:00:58 +0000324 def initialize(self, recover_hosts=True):
325 self._periodic_cleanup.initialize()
326 self._24hr_upkeep.initialize()
Dan Shi55d58992015-05-05 09:10:02 -0700327 # Execute all actions queued in the cleanup tasks. Scheduler tick will
328 # run a refresh task first. If there is any action in the queue, refresh
329 # will raise an exception.
330 _drone_manager.execute_actions()
showard915958d2009-04-22 21:00:58 +0000331
jadmanski0afbb632008-06-06 21:10:57 +0000332 # always recover processes
333 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000334
jadmanski0afbb632008-06-06 21:10:57 +0000335 if recover_hosts:
336 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000337
338
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800339 # TODO(pprabhu) Drop this metric once tick_times has been verified.
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -0800340 @metrics.SecondsTimerDecorator(
341 'chromeos/autotest/scheduler/tick_durations/tick')
jadmanski0afbb632008-06-06 21:10:57 +0000342 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700343 """
344 This is an altered version of tick() where we keep track of when each
345 major step begins so we can try to figure out where we are using most
346 of the tick time.
347 """
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800348 with metrics.RuntimeBreakdownTimer(
349 'chromeos/autotest/scheduler/tick_times') as breakdown_timer:
350 self._log_tick_msg('New tick')
351 system_utils.DroneCache.refresh()
352
353 with breakdown_timer.Step('garbage_collection'):
354 self._garbage_collection()
355 with breakdown_timer.Step('trigger_refresh'):
356 self._log_tick_msg('Starting _drone_manager.trigger_refresh')
357 _drone_manager.trigger_refresh()
358 with breakdown_timer.Step('process_recurring_runs'):
359 self._process_recurring_runs()
360 with breakdown_timer.Step('schedule_delay_tasks'):
361 self._schedule_delay_tasks()
362 with breakdown_timer.Step('schedule_running_host_queue_entries'):
363 self._schedule_running_host_queue_entries()
364 with breakdown_timer.Step('schedule_special_tasks'):
365 self._schedule_special_tasks()
366 with breakdown_timer.Step('schedule_new_jobs'):
367 self._schedule_new_jobs()
368 with breakdown_timer.Step('sync_refresh'):
369 self._log_tick_msg('Starting _drone_manager.sync_refresh')
370 _drone_manager.sync_refresh()
371 # _run_cleanup must be called between drone_manager.sync_refresh,
372 # and drone_manager.execute_actions, as sync_refresh will clear the
373 # calls queued in drones. Therefore, any action that calls
374 # drone.queue_call to add calls to the drone._calls, should be after
375 # drone refresh is completed and before
376 # drone_manager.execute_actions at the end of the tick.
377 with breakdown_timer.Step('run_cleanup'):
378 self._run_cleanup()
379 with breakdown_timer.Step('find_aborting'):
380 self._find_aborting()
381 with breakdown_timer.Step('find_aborted_special_tasks'):
382 self._find_aborted_special_tasks()
383 with breakdown_timer.Step('handle_agents'):
384 self._handle_agents()
385 with breakdown_timer.Step('host_scheduler_tick'):
386 self._log_tick_msg('Starting _host_scheduler.tick')
387 self._host_scheduler.tick()
388 with breakdown_timer.Step('drones_execute_actions'):
389 self._log_tick_msg('Starting _drone_manager.execute_actions')
390 _drone_manager.execute_actions()
391 with breakdown_timer.Step('send_queued_emails'):
392 self._log_tick_msg(
393 'Starting email_manager.manager.send_queued_emails')
394 email_manager.manager.send_queued_emails()
395 with breakdown_timer.Step('db_reset_queries'):
396 self._log_tick_msg('Starting django.db.reset_queries')
397 django.db.reset_queries()
398
399 self._tick_count += 1
400 metrics.Counter('chromeos/autotest/scheduler/tick').increment()
mbligh36768f02008-02-22 18:28:33 +0000401
showard97aed502008-11-04 02:01:24 +0000402
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800403 @_calls_log_tick_msg
mblighf3294cc2009-04-08 21:17:38 +0000404 def _run_cleanup(self):
405 self._periodic_cleanup.run_cleanup_maybe()
406 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000407
mbligh36768f02008-02-22 18:28:33 +0000408
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800409 @_calls_log_tick_msg
showardf13a9e22009-12-18 22:54:09 +0000410 def _garbage_collection(self):
411 threshold_time = time.time() - self._seconds_between_garbage_stats
412 if threshold_time < self._last_garbage_stats_time:
413 # Don't generate these reports very often.
414 return
415
416 self._last_garbage_stats_time = time.time()
417 # Force a full level 0 collection (because we can, it doesn't hurt
418 # at this interval).
419 gc.collect()
420 logging.info('Logging garbage collector stats on tick %d.',
421 self._tick_count)
422 gc_stats._log_garbage_collector_stats()
423
424
showard170873e2009-01-07 00:22:26 +0000425 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
426 for object_id in object_ids:
427 agent_dict.setdefault(object_id, set()).add(agent)
428
429
430 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
431 for object_id in object_ids:
432 assert object_id in agent_dict
433 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700434 # If an ID has no more active agent associated, there is no need to
435 # keep it in the dictionary. Otherwise, scheduler will keep an
436 # unnecessarily big dictionary until being restarted.
437 if not agent_dict[object_id]:
438 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000439
440
showardd1195652009-12-08 22:21:02 +0000441 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700442 """
443 Creates and adds an agent to the dispatchers list.
444
445 In creating the agent we also pass on all the queue_entry_ids and
446 host_ids from the special agent task. For every agent we create, we
447 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
448 against the host_ids given to it. So theoritically, a host can have any
449 number of agents associated with it, and each of them can have any
450 special agent task, though in practice we never see > 1 agent/task per
451 host at any time.
452
453 @param agent_task: A SpecialTask for the agent to manage.
454 """
showardd1195652009-12-08 22:21:02 +0000455 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000456 self._agents.append(agent)
457 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000458 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
459 self._register_agent_for_ids(self._queue_entry_agents,
460 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000461
showard170873e2009-01-07 00:22:26 +0000462
463 def get_agents_for_entry(self, queue_entry):
464 """
465 Find agents corresponding to the specified queue_entry.
466 """
showardd3dc1992009-04-22 21:01:40 +0000467 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000468
469
470 def host_has_agent(self, host):
471 """
472 Determine if there is currently an Agent present using this host.
473 """
474 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000475
476
jadmanski0afbb632008-06-06 21:10:57 +0000477 def remove_agent(self, agent):
478 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000479 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
480 agent)
481 self._unregister_agent_for_ids(self._queue_entry_agents,
482 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000483
484
showard8cc058f2009-09-08 16:26:33 +0000485 def _host_has_scheduled_special_task(self, host):
486 return bool(models.SpecialTask.objects.filter(host__id=host.id,
487 is_active=False,
488 is_complete=False))
489
490
jadmanski0afbb632008-06-06 21:10:57 +0000491 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000492 agent_tasks = self._create_recovery_agent_tasks()
493 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000494 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000495 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000496 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000497 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000498 self._reverify_remaining_hosts()
499 # reinitialize drones after killing orphaned processes, since they can
500 # leave around files when they die
501 _drone_manager.execute_actions()
502 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000503
showard170873e2009-01-07 00:22:26 +0000504
showardd1195652009-12-08 22:21:02 +0000505 def _create_recovery_agent_tasks(self):
506 return (self._get_queue_entry_agent_tasks()
507 + self._get_special_task_agent_tasks(is_active=True))
508
509
510 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700511 """
512 Get agent tasks for all hqe in the specified states.
513
514 Loosely this translates to taking a hqe in one of the specified states,
515 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
516 through _get_agent_task_for_queue_entry. Each queue entry can only have
517 one agent task at a time, but there might be multiple queue entries in
518 the group.
519
520 @return: A list of AgentTasks.
521 """
showardd1195652009-12-08 22:21:02 +0000522 # host queue entry statuses handled directly by AgentTasks (Verifying is
523 # handled through SpecialTasks, so is not listed here)
524 statuses = (models.HostQueueEntry.Status.STARTING,
525 models.HostQueueEntry.Status.RUNNING,
526 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000527 models.HostQueueEntry.Status.PARSING,
528 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000529 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000530 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000531 where='status IN (%s)' % status_list)
532
533 agent_tasks = []
534 used_queue_entries = set()
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800535 hqe_count_by_status = {}
showardd1195652009-12-08 22:21:02 +0000536 for entry in queue_entries:
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800537 hqe_count_by_status[entry.status] = (
538 hqe_count_by_status.get(entry.status, 0) + 1)
showardd1195652009-12-08 22:21:02 +0000539 if self.get_agents_for_entry(entry):
540 # already being handled
541 continue
542 if entry in used_queue_entries:
543 # already picked up by a synchronous job
544 continue
545 agent_task = self._get_agent_task_for_queue_entry(entry)
546 agent_tasks.append(agent_task)
547 used_queue_entries.update(agent_task.queue_entries)
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800548
549 for status, count in hqe_count_by_status.iteritems():
550 metrics.Gauge(
551 'chromeos/autotest/scheduler/active_host_queue_entries'
552 ).set(count, fields={'status': status})
553
showardd1195652009-12-08 22:21:02 +0000554 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000555
556
showardd1195652009-12-08 22:21:02 +0000557 def _get_special_task_agent_tasks(self, is_active=False):
558 special_tasks = models.SpecialTask.objects.filter(
559 is_active=is_active, is_complete=False)
560 return [self._get_agent_task_for_special_task(task)
561 for task in special_tasks]
562
563
564 def _get_agent_task_for_queue_entry(self, queue_entry):
565 """
beeps8bb1f7d2013-08-05 01:30:09 -0700566 Construct an AgentTask instance for the given active HostQueueEntry.
567
showardd1195652009-12-08 22:21:02 +0000568 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700569 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000570 """
571 task_entries = queue_entry.job.get_group_entries(queue_entry)
572 self._check_for_duplicate_host_entries(task_entries)
573
574 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
575 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000576 if queue_entry.is_hostless():
577 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000578 return QueueTask(queue_entries=task_entries)
579 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700580 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000581 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700582 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000583 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700584 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000585
Prashanth B0e960282014-05-13 19:38:28 -0700586 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800587 '_get_agent_task_for_queue_entry got entry with '
588 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000589
590
591 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000592 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
593 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000594 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000595 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000596 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000597 if using_host:
showardd1195652009-12-08 22:21:02 +0000598 self._assert_host_has_no_agent(task_entry)
599
600
601 def _assert_host_has_no_agent(self, entry):
602 """
603 @param entry: a HostQueueEntry or a SpecialTask
604 """
605 if self.host_has_agent(entry.host):
606 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700607 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000608 'While scheduling %s, host %s already has a host agent %s'
609 % (entry, entry.host, agent.task))
610
611
612 def _get_agent_task_for_special_task(self, special_task):
613 """
614 Construct an AgentTask class to run the given SpecialTask and add it
615 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700616
MK Ryu35d661e2014-09-25 17:44:10 -0700617 A special task is created through schedule_special_tasks, but only if
beeps8bb1f7d2013-08-05 01:30:09 -0700618 the host doesn't already have an agent. This happens through
619 add_agent_task. All special agent tasks are given a host on creation,
620 and a Null hqe. To create a SpecialAgentTask object, you need a
621 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
622 object contains a hqe it's passed on to the special agent task, which
623 creates a HostQueueEntry and saves it as it's queue_entry.
624
showardd1195652009-12-08 22:21:02 +0000625 @param special_task: a models.SpecialTask instance
626 @returns an AgentTask to run this SpecialTask
627 """
628 self._assert_host_has_no_agent(special_task)
629
beeps5e2bb4a2013-10-28 11:26:45 -0700630 special_agent_task_classes = (prejob_task.CleanupTask,
631 prejob_task.VerifyTask,
632 prejob_task.RepairTask,
633 prejob_task.ResetTask,
634 prejob_task.ProvisionTask)
635
showardd1195652009-12-08 22:21:02 +0000636 for agent_task_class in special_agent_task_classes:
637 if agent_task_class.TASK_TYPE == special_task.task:
638 return agent_task_class(task=special_task)
639
Prashanth B0e960282014-05-13 19:38:28 -0700640 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800641 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000642
643
644 def _register_pidfiles(self, agent_tasks):
645 for agent_task in agent_tasks:
646 agent_task.register_necessary_pidfiles()
647
648
649 def _recover_tasks(self, agent_tasks):
650 orphans = _drone_manager.get_orphaned_autoserv_processes()
651
652 for agent_task in agent_tasks:
653 agent_task.recover()
654 if agent_task.monitor and agent_task.monitor.has_process():
655 orphans.discard(agent_task.monitor.get_process())
656 self.add_agent_task(agent_task)
657
658 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000659
660
showard8cc058f2009-09-08 16:26:33 +0000661 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000662 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
663 % status):
showard0db3d432009-10-12 20:29:15 +0000664 if entry.status == status and not self.get_agents_for_entry(entry):
665 # The status can change during iteration, e.g., if job.run()
666 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000667 yield entry
668
669
showard6878e8b2009-07-20 22:37:45 +0000670 def _check_for_remaining_orphan_processes(self, orphans):
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800671 m = 'chromeos/autotest/errors/unrecovered_orphan_processes'
672 metrics.Gauge(m).set(len(orphans))
673
showard6878e8b2009-07-20 22:37:45 +0000674 if not orphans:
675 return
676 subject = 'Unrecovered orphan autoserv processes remain'
677 message = '\n'.join(str(process) for process in orphans)
mbligh5fa9e112009-08-03 16:46:06 +0000678 die_on_orphans = global_config.global_config.get_config_value(
679 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
680
681 if die_on_orphans:
682 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000683
showard170873e2009-01-07 00:22:26 +0000684
showard8cc058f2009-09-08 16:26:33 +0000685 def _recover_pending_entries(self):
686 for entry in self._get_unassigned_entries(
687 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000688 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000689 entry.on_pending()
690
691
showardb8900452009-10-12 20:31:01 +0000692 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000693 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000694 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
695 unrecovered_hqes = []
696 for queue_entry in queue_entries:
697 special_tasks = models.SpecialTask.objects.filter(
698 task__in=(models.SpecialTask.Task.CLEANUP,
699 models.SpecialTask.Task.VERIFY),
700 queue_entry__id=queue_entry.id,
701 is_complete=False)
702 if special_tasks.count() == 0:
703 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000704
showardb8900452009-10-12 20:31:01 +0000705 if unrecovered_hqes:
706 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700707 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000708 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000709 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000710
711
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800712 @_calls_log_tick_msg
showard65db3932009-10-28 19:54:35 +0000713 def _schedule_special_tasks(self):
714 """
715 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700716
717 Special tasks include PreJobTasks like verify, reset and cleanup.
718 They are created through _schedule_new_jobs and associated with a hqe
719 This method translates SpecialTasks to the appropriate AgentTask and
720 adds them to the dispatchers agents list, so _handle_agents can execute
721 them.
showard65db3932009-10-28 19:54:35 +0000722 """
Prashanth B4ec98672014-05-15 10:44:54 -0700723 # When the host scheduler is responsible for acquisition we only want
724 # to run tasks with leased hosts. All hqe tasks will already have
725 # leased hosts, and we don't want to run frontend tasks till the host
726 # scheduler has vetted the assignment. Note that this doesn't include
727 # frontend tasks with hosts leased by other active hqes.
728 for task in self._job_query_manager.get_prioritized_special_tasks(
729 only_tasks_with_leased_hosts=not _inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000730 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000731 continue
showardd1195652009-12-08 22:21:02 +0000732 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000733
734
showard170873e2009-01-07 00:22:26 +0000735 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000736 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000737 # should never happen
showarded2afea2009-07-07 20:54:07 +0000738 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000739 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000740 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700741 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000742 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000743
744
jadmanski0afbb632008-06-06 21:10:57 +0000745 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000746 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700747 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000748 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000749 if self.host_has_agent(host):
750 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000751 continue
showard8cc058f2009-09-08 16:26:33 +0000752 if self._host_has_scheduled_special_task(host):
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700753 # host will have a special task scheduled on the next tick
showard8cc058f2009-09-08 16:26:33 +0000754 continue
showard170873e2009-01-07 00:22:26 +0000755 if print_message:
showardb18134f2009-03-20 20:52:18 +0000756 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000757 models.SpecialTask.objects.create(
758 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000759 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000760
761
jadmanski0afbb632008-06-06 21:10:57 +0000762 def _recover_hosts(self):
763 # recover "Repair Failed" hosts
764 message = 'Reverifying dead host %s'
765 self._reverify_hosts_where("status = 'Repair Failed'",
766 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000767
768
showard89f84db2009-03-12 20:39:13 +0000769 def _refresh_pending_queue_entries(self):
770 """
771 Lookup the pending HostQueueEntries and call our HostScheduler
772 refresh() method given that list. Return the list.
773
774 @returns A list of pending HostQueueEntries sorted in priority order.
775 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700776 queue_entries = self._job_query_manager.get_pending_queue_entries(
777 only_hostless=not _inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000778 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000779 return []
showard89f84db2009-03-12 20:39:13 +0000780 return queue_entries
781
782
showarda9545c02009-12-18 22:44:26 +0000783 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800784 """Schedule a hostless (suite) job.
785
786 @param queue_entry: The queue_entry representing the hostless job.
787 """
showarda9545c02009-12-18 22:44:26 +0000788 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700789
790 # Need to set execution_subdir before setting the status:
791 # After a restart of the scheduler, agents will be restored for HQEs in
792 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
793 # execution_subdir is needed. Therefore it must be set before entering
794 # one of these states.
795 # Otherwise, if the scheduler was interrupted between setting the status
796 # and the execution_subdir, upon it's restart restoring agents would
797 # fail.
798 # Is there a way to get a status in one of these states without going
799 # through this code? Following cases are possible:
800 # - If it's aborted before being started:
801 # active bit will be 0, so there's nothing to parse, it will just be
802 # set to completed by _find_aborting. Critical statuses are skipped.
803 # - If it's aborted or it fails after being started:
804 # It was started, so this code was executed.
805 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000806 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000807
808
beepscc9fc702013-12-02 12:45:38 -0800809 def _schedule_host_job(self, host, queue_entry):
810 """Schedules a job on the given host.
811
812 1. Assign the host to the hqe, if it isn't already assigned.
813 2. Create a SpecialAgentTask for the hqe.
814 3. Activate the hqe.
815
816 @param queue_entry: The job to schedule.
817 @param host: The host to schedule the job on.
818 """
819 if self.host_has_agent(host):
820 host_agent_task = list(self._host_agents.get(host.id))[0].task
beepscc9fc702013-12-02 12:45:38 -0800821 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700822 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800823
824
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800825 @_calls_log_tick_msg
showard89f84db2009-03-12 20:39:13 +0000826 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700827 """
828 Find any new HQEs and call schedule_pre_job_tasks for it.
829
830 This involves setting the status of the HQE and creating a row in the
831 db corresponding the the special task, through
832 scheduler_models._queue_special_task. The new db row is then added as
833 an agent to the dispatcher through _schedule_special_tasks and
834 scheduled for execution on the drone through _handle_agents.
835 """
showard89f84db2009-03-12 20:39:13 +0000836 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000837
beepscc9fc702013-12-02 12:45:38 -0800838 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700839 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700840 new_jobs_with_hosts = 0
841 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800842 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700843 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000844
beepscc9fc702013-12-02 12:45:38 -0800845 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000846 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000847 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700848 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000849 else:
beepscc9fc702013-12-02 12:45:38 -0800850 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700851 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700852
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800853 metrics.Counter(
854 'chromeos/autotest/scheduler/scheduled_jobs_hostless'
855 ).increment_by(new_hostless_jobs)
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800856
beepscc9fc702013-12-02 12:45:38 -0800857 if not host_jobs:
858 return
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800859
Prathmesh Prabhu3c6a3bf2016-12-20 11:29:02 -0800860 if not _inline_host_acquisition:
861 # In this case, host_scheduler is responsible for scheduling
862 # host_jobs. Scheduling the jobs ourselves can lead to DB corruption
863 # since host_scheduler assumes it is the single process scheduling
864 # host jobs.
865 metrics.Gauge(
866 'chromeos/autotest/errors/scheduler/unexpected_host_jobs').set(
867 len(host_jobs))
868 return
869
Prashanth Bf66d51b2014-05-06 12:42:25 -0700870 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
871 for host_assignment in jobs_with_hosts:
872 self._schedule_host_job(host_assignment.host, host_assignment.job)
873 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800874
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800875 metrics.Counter(
876 'chromeos/autotest/scheduler/scheduled_jobs_with_hosts'
877 ).increment_by(new_jobs_with_hosts)
878 # TODO(pprabhu): Decide what to do about this metric. Million dollar
879 # question: What happens to jobs that were not matched. Do they stay in
880 # the queue, and get processed right here in the next tick (then we want
881 # a guage corresponding to the number of outstanding unmatched host
882 # jobs), or are they handled somewhere else (then we need a counter
883 # corresponding to failed_to_match_with_hosts jobs).
884 #autotest_stats.Gauge(key).send('new_jobs_without_hosts',
885 # new_jobs_need_hosts -
886 # new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000887
888
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800889 @_calls_log_tick_msg
showard8cc058f2009-09-08 16:26:33 +0000890 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700891 """
892 Adds agents to the dispatcher.
893
894 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
895 QueueTask for example, will have a job with a control file, and
896 the agent will have methods that poll, abort and check if the queue
897 task is finished. The dispatcher runs the agent_task, as well as
898 other agents in it's _agents member, through _handle_agents, by
899 calling the Agents tick().
900
901 This method creates an agent for each HQE in one of (starting, running,
902 gathering, parsing, archiving) states, and adds it to the dispatcher so
903 it is handled by _handle_agents.
904 """
showardd1195652009-12-08 22:21:02 +0000905 for agent_task in self._get_queue_entry_agent_tasks():
906 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000907
908
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800909 @_calls_log_tick_msg
showard8cc058f2009-09-08 16:26:33 +0000910 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000911 for entry in scheduler_models.HostQueueEntry.fetch(
912 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000913 task = entry.job.schedule_delayed_callback_task(entry)
914 if task:
showardd1195652009-12-08 22:21:02 +0000915 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000916
917
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800918 @_calls_log_tick_msg
jadmanski0afbb632008-06-06 21:10:57 +0000919 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700920 """
921 Looks through the afe_host_queue_entries for an aborted entry.
922
923 The aborted bit is set on an HQE in many ways, the most common
924 being when a user requests an abort through the frontend, which
925 results in an rpc from the afe to abort_host_queue_entries.
926 """
jamesrene7c65cb2010-06-08 20:38:10 +0000927 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000928 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700929 where='aborted=1 and complete=0'):
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800930
931 # If the job is running on a shard, let the shard handle aborting
932 # it and sync back the right status.
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800933 if entry.job.shard_id is not None and not server_utils.is_shard():
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800934 logging.info('Waiting for shard %s to abort hqe %s',
935 entry.job.shard_id, entry)
936 continue
937
showardf4a2e502009-07-28 20:06:39 +0000938 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800939
940 # The task would have started off with both is_complete and
941 # is_active = False. Aborted tasks are neither active nor complete.
942 # For all currently active tasks this will happen through the agent,
943 # but we need to manually update the special tasks that haven't
944 # started yet, because they don't have agents.
945 models.SpecialTask.objects.filter(is_active=False,
946 queue_entry_id=entry.id).update(is_complete=True)
947
showardd3dc1992009-04-22 21:01:40 +0000948 for agent in self.get_agents_for_entry(entry):
949 agent.abort()
950 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000951 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700952 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000953 for job in jobs_to_stop:
954 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000955
956
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800957 @_calls_log_tick_msg
beeps8bb1f7d2013-08-05 01:30:09 -0700958 def _find_aborted_special_tasks(self):
959 """
960 Find SpecialTasks that have been marked for abortion.
961
962 Poll the database looking for SpecialTasks that are active
963 and have been marked for abortion, then abort them.
964 """
965
966 # The completed and active bits are very important when it comes
967 # to scheduler correctness. The active bit is set through the prolog
968 # of a special task, and reset through the cleanup method of the
969 # SpecialAgentTask. The cleanup is called both through the abort and
970 # epilog. The complete bit is set in several places, and in general
971 # a hanging job will have is_active=1 is_complete=0, while a special
972 # task which completed will have is_active=0 is_complete=1. To check
973 # aborts we directly check active because the complete bit is set in
974 # several places, including the epilog of agent tasks.
975 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
976 is_aborted=True)
977 for task in aborted_tasks:
978 # There are 2 ways to get the agent associated with a task,
979 # through the host and through the hqe. A special task
980 # always needs a host, but doesn't always need a hqe.
981 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700982 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000983
beeps8bb1f7d2013-08-05 01:30:09 -0700984 # The epilog preforms critical actions such as
985 # queueing the next SpecialTask, requeuing the
986 # hqe etc, however it doesn't actually kill the
987 # monitor process and set the 'done' bit. Epilogs
988 # assume that the job failed, and that the monitor
989 # process has already written an exit code. The
990 # done bit is a necessary condition for
991 # _handle_agents to schedule any more special
992 # tasks against the host, and it must be set
993 # in addition to is_active, is_complete and success.
994 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000995 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700996
997
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700998 def _can_start_agent(self, agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000999 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +00001000 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +00001001 return True
1002 # don't allow any nonzero-process agents to run after we've reached a
1003 # limit (this avoids starvation of many-process agents)
1004 if have_reached_limit:
1005 return False
1006 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001007 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +00001008 agent.task.owner_username,
1009 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +00001010 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001011 return False
showard4c5374f2008-09-04 17:02:56 +00001012 return True
1013
1014
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -08001015 @_calls_log_tick_msg
jadmanski0afbb632008-06-06 21:10:57 +00001016 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -07001017 """
1018 Handles agents of the dispatcher.
1019
1020 Appropriate Agents are added to the dispatcher through
1021 _schedule_running_host_queue_entries. These agents each
1022 have a task. This method runs the agents task through
1023 agent.tick() leading to:
1024 agent.start
1025 prolog -> AgentTasks prolog
1026 For each queue entry:
1027 sets host status/status to Running
1028 set started_on in afe_host_queue_entries
1029 run -> AgentTasks run
1030 Creates PidfileRunMonitor
1031 Queues the autoserv command line for this AgentTask
1032 via the drone manager. These commands are executed
1033 through the drone managers execute actions.
1034 poll -> AgentTasks/BaseAgentTask poll
1035 checks the monitors exit_code.
1036 Executes epilog if task is finished.
1037 Executes AgentTasks _finish_task
1038 finish_task is usually responsible for setting the status
1039 of the HQE/host, and updating it's active and complete fileds.
1040
1041 agent.is_done
1042 Removed the agent from the dispatchers _agents queue.
1043 Is_done checks the finished bit on the agent, that is
1044 set based on the Agents task. During the agents poll
1045 we check to see if the monitor process has exited in
1046 it's finish method, and set the success member of the
1047 task based on this exit code.
1048 """
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001049 num_started_this_tick = 0
1050 num_finished_this_tick = 0
showard4c5374f2008-09-04 17:02:56 +00001051 have_reached_limit = False
1052 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -07001053 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +00001054 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001055 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1056 'queue_entry ids:%s' % (agent.host_ids,
1057 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001058 if not agent.started:
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001059 if not self._can_start_agent(agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001060 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001061 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001062 continue
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001063 num_started_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001064 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001065 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001066 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001067 if agent.is_done():
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001068 num_finished_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001069 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001070 self.remove_agent(agent)
Prathmesh Prabhued7ece92016-11-23 11:19:43 -08001071
1072 metrics.Counter(
1073 'chromeos/autotest/scheduler/agent_processes_started'
1074 ).increment_by(num_started_this_tick)
1075 metrics.Counter(
1076 'chromeos/autotest/scheduler/agent_processes_finished'
1077 ).increment_by(num_finished_this_tick)
1078 num_agent_processes = _drone_manager.total_running_processes()
1079 metrics.Gauge(
1080 'chromeos/autotest/scheduler/agent_processes'
1081 ).set(num_agent_processes)
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001082 logging.info('%d running processes. %d added this tick.',
Prathmesh Prabhued7ece92016-11-23 11:19:43 -08001083 num_agent_processes, num_started_this_tick)
mbligh36768f02008-02-22 18:28:33 +00001084
1085
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -08001086 @_calls_log_tick_msg
showard29f7cd22009-04-29 21:16:24 +00001087 def _process_recurring_runs(self):
1088 recurring_runs = models.RecurringRun.objects.filter(
1089 start_date__lte=datetime.datetime.now())
1090 for rrun in recurring_runs:
1091 # Create job from template
1092 job = rrun.job
1093 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001094 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001095
1096 host_objects = info['hosts']
1097 one_time_hosts = info['one_time_hosts']
1098 metahost_objects = info['meta_hosts']
1099 dependencies = info['dependencies']
1100 atomic_group = info['atomic_group']
1101
1102 for host in one_time_hosts or []:
1103 this_host = models.Host.create_one_time_host(host.hostname)
1104 host_objects.append(this_host)
1105
1106 try:
1107 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001108 options=options,
showard29f7cd22009-04-29 21:16:24 +00001109 host_objects=host_objects,
1110 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001111 atomic_group=atomic_group)
1112
1113 except Exception, ex:
1114 logging.exception(ex)
1115 #TODO send email
1116
1117 if rrun.loop_count == 1:
1118 rrun.delete()
1119 else:
1120 if rrun.loop_count != 0: # if not infinite loop
1121 # calculate new start_date
1122 difference = datetime.timedelta(seconds=rrun.loop_period)
1123 rrun.start_date = rrun.start_date + difference
1124 rrun.loop_count -= 1
1125 rrun.save()
1126
1127
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -08001128 def _log_tick_msg(self, msg):
1129 if self._tick_debug:
1130 logging.debug(msg)
1131
1132
1133 def _log_extra_msg(self, msg):
1134 if self._extra_debugging:
1135 logging.debug(msg)
1136
1137
Simran Basia858a232012-08-21 11:04:37 -07001138SiteDispatcher = utils.import_site_class(
1139 __file__, 'autotest_lib.scheduler.site_monitor_db',
1140 'SiteDispatcher', BaseDispatcher)
1141
1142class Dispatcher(SiteDispatcher):
1143 pass
1144
1145
mbligh36768f02008-02-22 18:28:33 +00001146class Agent(object):
showard77182562009-06-10 00:16:05 +00001147 """
Alex Miller47715eb2013-07-24 03:34:01 -07001148 An agent for use by the Dispatcher class to perform a task. An agent wraps
1149 around an AgentTask mainly to associate the AgentTask with the queue_entry
1150 and host ids.
showard77182562009-06-10 00:16:05 +00001151
1152 The following methods are required on all task objects:
1153 poll() - Called periodically to let the task check its status and
1154 update its internal state. If the task succeeded.
1155 is_done() - Returns True if the task is finished.
1156 abort() - Called when an abort has been requested. The task must
1157 set its aborted attribute to True if it actually aborted.
1158
1159 The following attributes are required on all task objects:
1160 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001161 success - bool, True if this task succeeded.
1162 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1163 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001164 """
1165
1166
showard418785b2009-11-23 20:19:59 +00001167 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001168 """
Alex Miller47715eb2013-07-24 03:34:01 -07001169 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001170 """
showard8cc058f2009-09-08 16:26:33 +00001171 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001172
showard77182562009-06-10 00:16:05 +00001173 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001174 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001175
showard8cc058f2009-09-08 16:26:33 +00001176 self.queue_entry_ids = task.queue_entry_ids
1177 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001178
showard8cc058f2009-09-08 16:26:33 +00001179 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001180 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001181
1182
jadmanski0afbb632008-06-06 21:10:57 +00001183 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001184 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001185 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001186 self.task.poll()
1187 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001188 self.finished = True
showardec113162008-05-08 00:52:49 +00001189
1190
jadmanski0afbb632008-06-06 21:10:57 +00001191 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001192 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001193
1194
showardd3dc1992009-04-22 21:01:40 +00001195 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001196 if self.task:
1197 self.task.abort()
1198 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001199 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001200 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001201
showardd3dc1992009-04-22 21:01:40 +00001202
beeps5e2bb4a2013-10-28 11:26:45 -07001203class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001204 """
1205 Common functionality for QueueTask and HostlessQueueTask
1206 """
1207 def __init__(self, queue_entries):
1208 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001209 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001210 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001211
1212
showard73ec0442009-02-07 02:05:20 +00001213 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001214 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001215
1216
jamesrenc44ae992010-02-19 00:12:54 +00001217 def _write_control_file(self, execution_path):
1218 control_path = _drone_manager.attach_file_to_execution(
1219 execution_path, self.job.control_file)
1220 return control_path
1221
1222
Aviv Keshet308e7362013-05-21 14:43:16 -07001223 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001224 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001225 execution_path = self.queue_entries[0].execution_path()
1226 control_path = self._write_control_file(execution_path)
1227 hostnames = ','.join(entry.host.hostname
1228 for entry in self.queue_entries
1229 if not entry.is_hostless())
1230
1231 execution_tag = self.queue_entries[0].execution_tag()
1232 params = _autoserv_command_line(
1233 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001234 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001235 _drone_manager.absolute_path(control_path)],
1236 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001237 if self.job.is_image_update_job():
1238 params += ['--image', self.job.update_image_path]
1239
jamesrenc44ae992010-02-19 00:12:54 +00001240 return params
showardd1195652009-12-08 22:21:02 +00001241
1242
1243 @property
1244 def num_processes(self):
1245 return len(self.queue_entries)
1246
1247
1248 @property
1249 def owner_username(self):
1250 return self.job.owner
1251
1252
1253 def _working_directory(self):
1254 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001255
1256
jadmanski0afbb632008-06-06 21:10:57 +00001257 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001258 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001259 keyval_dict = self.job.keyval_dict()
1260 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001261 group_name = self.queue_entries[0].get_group_name()
1262 if group_name:
1263 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001264 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001265 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001266 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001267 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001268
1269
showard35162b02009-03-03 02:17:30 +00001270 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001271 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001272 _drone_manager.write_lines_to_file(error_file_path,
1273 [_LOST_PROCESS_ERROR])
1274
1275
showardd3dc1992009-04-22 21:01:40 +00001276 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001277 if not self.monitor:
1278 return
1279
showardd9205182009-04-27 20:09:55 +00001280 self._write_job_finished()
1281
showard35162b02009-03-03 02:17:30 +00001282 if self.monitor.lost_process:
1283 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001284
jadmanskif7fa2cc2008-10-01 14:13:23 +00001285
showardcbd74612008-11-19 21:42:02 +00001286 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001287 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001288 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001289 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001290 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001291
1292
jadmanskif7fa2cc2008-10-01 14:13:23 +00001293 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001294 if not self.monitor or not self.monitor.has_process():
1295 return
1296
jadmanskif7fa2cc2008-10-01 14:13:23 +00001297 # build up sets of all the aborted_by and aborted_on values
1298 aborted_by, aborted_on = set(), set()
1299 for queue_entry in self.queue_entries:
1300 if queue_entry.aborted_by:
1301 aborted_by.add(queue_entry.aborted_by)
1302 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1303 aborted_on.add(t)
1304
1305 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001306 # TODO(showard): this conditional is now obsolete, we just need to leave
1307 # it in temporarily for backwards compatibility over upgrades. delete
1308 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001309 assert len(aborted_by) <= 1
1310 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001311 aborted_by_value = aborted_by.pop()
1312 aborted_on_value = max(aborted_on)
1313 else:
1314 aborted_by_value = 'autotest_system'
1315 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001316
showarda0382352009-02-11 23:36:43 +00001317 self._write_keyval_after_job("aborted_by", aborted_by_value)
1318 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001319
showardcbd74612008-11-19 21:42:02 +00001320 aborted_on_string = str(datetime.datetime.fromtimestamp(
1321 aborted_on_value))
1322 self._write_status_comment('Job aborted by %s on %s' %
1323 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001324
1325
jadmanski0afbb632008-06-06 21:10:57 +00001326 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001327 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001328 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001329 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001330
1331
jadmanski0afbb632008-06-06 21:10:57 +00001332 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001333 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001334 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001335
1336
1337class QueueTask(AbstractQueueTask):
1338 def __init__(self, queue_entries):
1339 super(QueueTask, self).__init__(queue_entries)
1340 self._set_ids(queue_entries=queue_entries)
1341
1342
1343 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001344 self._check_queue_entry_statuses(
1345 self.queue_entries,
1346 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1347 models.HostQueueEntry.Status.RUNNING),
1348 allowed_host_statuses=(models.Host.Status.PENDING,
1349 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001350
1351 super(QueueTask, self).prolog()
1352
1353 for queue_entry in self.queue_entries:
1354 self._write_host_keyvals(queue_entry.host)
1355 queue_entry.host.set_status(models.Host.Status.RUNNING)
1356 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001357
1358
1359 def _finish_task(self):
1360 super(QueueTask, self)._finish_task()
1361
1362 for queue_entry in self.queue_entries:
1363 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001364 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001365
1366
Alex Miller9f01d5d2013-08-08 02:26:01 -07001367 def _command_line(self):
Dan Shi36cfd832014-10-10 13:38:51 -07001368 invocation = super(QueueTask, self)._command_line()
1369 # Check if server-side packaging is needed.
1370 if (_enable_ssp_container and
1371 self.job.control_type == control_data.CONTROL_TYPE.SERVER and
1372 self.job.require_ssp != False):
Dan Shiec1d47d2015-02-13 11:38:13 -08001373 invocation += ['--require-ssp']
Dan Shi36cfd832014-10-10 13:38:51 -07001374 keyval_dict = self.job.keyval_dict()
1375 test_source_build = keyval_dict.get('test_source_build', None)
1376 if test_source_build:
1377 invocation += ['--test_source_build', test_source_build]
Dan Shi70647ca2015-07-16 22:52:35 -07001378 if self.job.parent_job_id:
1379 invocation += ['--parent_job_id', str(self.job.parent_job_id)]
Dan Shi36cfd832014-10-10 13:38:51 -07001380 return invocation + ['--verify_job_repo_url']
Alex Miller9f01d5d2013-08-08 02:26:01 -07001381
1382
Dan Shi1a189052013-10-28 14:41:35 -07001383class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001384 def __init__(self, queue_entry):
1385 super(HostlessQueueTask, self).__init__([queue_entry])
1386 self.queue_entry_ids = [queue_entry.id]
1387
1388
1389 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001390 super(HostlessQueueTask, self).prolog()
1391
1392
mbligh4608b002010-01-05 18:22:35 +00001393 def _finish_task(self):
1394 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001395
1396 # When a job is added to database, its initial status is always
1397 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1398 # status, check if any of them can be started. If scheduler hits some
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001399 # limit, e.g., max_hostless_jobs_per_drone, scheduler will
1400 # leave these jobs in Starting status. Otherwise, the jobs'
1401 # status will be changed to Running, and an autoserv process
1402 # will be started in drone for each of these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001403 # If the entry is still in status Starting, the process has not started
1404 # yet. Therefore, there is no need to parse and collect log. Without
1405 # this check, exception will be raised by scheduler as execution_subdir
1406 # for this queue entry does not have a value yet.
1407 hqe = self.queue_entries[0]
1408 if hqe.status != models.HostQueueEntry.Status.STARTING:
1409 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001410
1411
mbligh36768f02008-02-22 18:28:33 +00001412if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001413 main()