blob: e8a00fab4769278a8908a90bce4461eae37d5247 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
Richard Barnetteffed1722016-05-18 15:57:22 -07002
3#pylint: disable=C0111
4
mbligh36768f02008-02-22 18:28:33 +00005"""
6Autotest scheduler
7"""
showard909c7a62008-07-15 21:52:38 +00008
Dan Shif6c65bd2014-08-29 16:15:07 -07009import datetime
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -080010import functools
Dan Shif6c65bd2014-08-29 16:15:07 -070011import gc
12import logging
13import optparse
14import os
15import signal
16import sys
17import time
showard402934a2009-12-21 22:20:47 +000018
Alex Miller05d7b4c2013-03-04 07:49:38 -080019import common
showard21baa452008-10-21 00:08:39 +000020from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000021
22import django.db
Aviv Keshet65fed072016-06-29 10:20:55 -070023from chromite.lib import metrics
Richard Barnetteffed1722016-05-18 15:57:22 -070024from chromite.lib import ts_mon_config
showard402934a2009-12-21 22:20:47 +000025
Dan Shiec1d47d2015-02-13 11:38:13 -080026from autotest_lib.client.common_lib import control_data
Prashanth B0e960282014-05-13 19:38:28 -070027from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070028from autotest_lib.client.common_lib import utils
Prashanth B0e960282014-05-13 19:38:28 -070029from autotest_lib.frontend.afe import models, rpc_utils
Fang Dengc330bee2014-10-21 18:10:55 -070030from autotest_lib.scheduler import agent_task, drone_manager
beeps5e2bb4a2013-10-28 11:26:45 -070031from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
32from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070033from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070034from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070035from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000036from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080037from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070038from autotest_lib.server import autoserv_utils
Dan Shi114e1722016-01-10 18:12:53 -080039from autotest_lib.server import system_utils
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080040from autotest_lib.server import utils as server_utils
Dan Shicf2e8dd2015-05-07 17:18:48 -070041from autotest_lib.site_utils import metadata_reporter
Dan Shib9144a42014-12-01 16:09:32 -080042from autotest_lib.site_utils import server_manager_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080043
Dan Shicf2e8dd2015-05-07 17:18:48 -070044
showard549afad2009-08-20 23:33:36 +000045BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
46PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000047
mbligh36768f02008-02-22 18:28:33 +000048RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000049AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
50
51if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000052 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000053AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
54AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
55
56if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000057 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000058
showard35162b02009-03-03 02:17:30 +000059# error message to leave in results dir when an autoserv process disappears
60# mysteriously
61_LOST_PROCESS_ERROR = """\
62Autoserv failed abnormally during execution for this job, probably due to a
63system error on the Autotest server. Full results may not be available. Sorry.
64"""
65
Prashanth B0e960282014-05-13 19:38:28 -070066_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070067_db = None
mbligh36768f02008-02-22 18:28:33 +000068_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070069
70# These 2 globals are replaced for testing
71_autoserv_directory = autoserv_utils.autoserv_directory
72_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000073_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000074_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070075
mbligh36768f02008-02-22 18:28:33 +000076
mbligh83c1e9e2009-05-01 23:10:41 +000077def _site_init_monitor_db_dummy():
78 return {}
79
80
jamesren76fcf192010-04-21 20:39:50 +000081def _verify_default_drone_set_exists():
82 if (models.DroneSet.drone_sets_enabled() and
83 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070084 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080085 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000086
87
88def _sanity_check():
89 """Make sure the configs are consistent before starting the scheduler"""
90 _verify_default_drone_set_exists()
91
92
mbligh36768f02008-02-22 18:28:33 +000093def main():
showard27f33872009-04-07 18:20:53 +000094 try:
showard549afad2009-08-20 23:33:36 +000095 try:
96 main_without_exception_handling()
97 except SystemExit:
98 raise
99 except:
100 logging.exception('Exception escaping in monitor_db')
101 raise
102 finally:
103 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000104
105
106def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700107 scheduler_lib.setup_logging(
108 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
109 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000110 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000111 parser = optparse.OptionParser(usage)
112 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
113 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000114 parser.add_option('--test', help='Indicate that scheduler is under ' +
115 'test and should use dummy autoserv and no parsing',
116 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700117 parser.add_option('--production',
118 help=('Indicate that scheduler is running in production '
119 'environment and it can use database that is not '
120 'hosted in localhost. If it is set to False, '
121 'scheduler will fail if database is not in '
122 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700123 action='store_true', default=False)
jadmanski0afbb632008-06-06 21:10:57 +0000124 (options, args) = parser.parse_args()
125 if len(args) != 1:
126 parser.print_usage()
127 return
mbligh36768f02008-02-22 18:28:33 +0000128
Dan Shif6c65bd2014-08-29 16:15:07 -0700129 scheduler_lib.check_production_settings(options)
130
showard5613c662009-06-08 23:30:33 +0000131 scheduler_enabled = global_config.global_config.get_config_value(
132 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
133
134 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800135 logging.error("Scheduler not enabled, set enable_scheduler to true in "
136 "the global_config's SCHEDULER section to enable it. "
137 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000138 sys.exit(1)
139
jadmanski0afbb632008-06-06 21:10:57 +0000140 global RESULTS_DIR
141 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000142
mbligh83c1e9e2009-05-01 23:10:41 +0000143 site_init = utils.import_site_function(__file__,
144 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
145 _site_init_monitor_db_dummy)
146 site_init()
147
showardcca334f2009-03-12 20:38:34 +0000148 # Change the cwd while running to avoid issues incase we were launched from
149 # somewhere odd (such as a random NFS home directory of the person running
150 # sudo to launch us as the appropriate user).
151 os.chdir(RESULTS_DIR)
152
jamesrenc7d387e2010-08-10 21:48:30 +0000153 # This is helpful for debugging why stuff a scheduler launches is
154 # misbehaving.
155 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000156
jadmanski0afbb632008-06-06 21:10:57 +0000157 if options.test:
158 global _autoserv_path
159 _autoserv_path = 'autoserv_dummy'
160 global _testing_mode
161 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000162
jamesrenc44ae992010-02-19 00:12:54 +0000163 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000164 server.start()
165
Dan Shicf2e8dd2015-05-07 17:18:48 -0700166 # Start the thread to report metadata.
167 metadata_reporter.start()
168
Paul Hobbseedcb8b2016-10-05 16:44:27 -0700169 with ts_mon_config.SetupTsMonGlobalState('autotest_scheduler',
170 indirect=True):
171 try:
172 initialize()
173 dispatcher = Dispatcher()
174 dispatcher.initialize(recover_hosts=options.recover_hosts)
175 minimum_tick_sec = global_config.global_config.get_config_value(
176 scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
Richard Barnetteffed1722016-05-18 15:57:22 -0700177
Paul Hobbseedcb8b2016-10-05 16:44:27 -0700178 while not _shutdown and not server._shutdown_scheduler:
179 start = time.time()
180 dispatcher.tick()
181 curr_tick_sec = time.time() - start
182 if minimum_tick_sec > curr_tick_sec:
183 time.sleep(minimum_tick_sec - curr_tick_sec)
184 else:
185 time.sleep(0.0001)
186 except server_manager_utils.ServerActionError as e:
187 # This error is expected when the server is not in primary status
188 # for scheduler role. Thus do not send email for it.
189 logging.exception(e)
190 except Exception:
191 email_manager.manager.log_stacktrace(
192 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000193
Paul Hobbsabd3b052016-10-03 18:25:23 +0000194 metadata_reporter.abort()
195 email_manager.manager.send_queued_emails()
196 server.shutdown()
197 _drone_manager.shutdown()
198 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000199
200
Prashanth B4ec98672014-05-15 10:44:54 -0700201def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000202 global _shutdown
203 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000204 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000205
206
jamesrenc44ae992010-02-19 00:12:54 +0000207def initialize():
showardb18134f2009-03-20 20:52:18 +0000208 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
209 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000210
showard8de37132009-08-31 18:33:08 +0000211 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000212 logging.critical("monitor_db already running, aborting!")
213 sys.exit(1)
214 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000215
showardb1e51872008-10-07 11:08:18 +0000216 if _testing_mode:
217 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700218 scheduler_lib.DB_CONFIG_SECTION, 'database',
219 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000220
Dan Shib9144a42014-12-01 16:09:32 -0800221 # If server database is enabled, check if the server has role `scheduler`.
222 # If the server does not have scheduler role, exception will be raised and
223 # scheduler will not continue to run.
224 if server_manager_utils.use_server_db():
225 server_manager_utils.confirm_server_has_role(hostname='localhost',
226 role='scheduler')
227
jadmanski0afbb632008-06-06 21:10:57 +0000228 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700229 global _db_manager
230 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700231 global _db
232 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000233 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700234 signal.signal(signal.SIGINT, handle_signal)
235 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000236
jamesrenc44ae992010-02-19 00:12:54 +0000237 initialize_globals()
238 scheduler_models.initialize()
239
Dan Shi114e1722016-01-10 18:12:53 -0800240 drone_list = system_utils.get_drones()
showard170873e2009-01-07 00:22:26 +0000241 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000242 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000243 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
244
showardb18134f2009-03-20 20:52:18 +0000245 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000246
247
jamesrenc44ae992010-02-19 00:12:54 +0000248def initialize_globals():
249 global _drone_manager
250 _drone_manager = drone_manager.instance()
251
252
showarded2afea2009-07-07 20:54:07 +0000253def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
254 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000255 """
256 @returns The autoserv command line as a list of executable + parameters.
257
258 @param machines - string - A machine or comma separated list of machines
259 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000260 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700261 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
262 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000263 @param queue_entry - A HostQueueEntry object - If supplied and no Job
264 object was supplied, this will be used to lookup the Job object.
265 """
Simran Basi1bf60eb2015-12-01 16:39:29 -0800266 command = autoserv_utils.autoserv_run_job_command(_autoserv_directory,
Aviv Keshet308e7362013-05-21 14:43:16 -0700267 machines, results_directory=drone_manager.WORKING_DIRECTORY,
268 extra_args=extra_args, job=job, queue_entry=queue_entry,
Simran Basi1bf60eb2015-12-01 16:39:29 -0800269 verbose=verbose, in_lab=True)
270 return command
showard87ba02a2009-04-20 19:37:32 +0000271
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800272def _calls_log_tick_msg(func):
273 """Used to trace functions called by BaseDispatcher.tick."""
274 @functools.wraps(func)
275 def wrapper(self, *args, **kwargs):
276 self._log_tick_msg('Starting %s' % func.__name__)
277 return func(self, *args, **kwargs)
278
279 return wrapper
280
showard87ba02a2009-04-20 19:37:32 +0000281
Simran Basia858a232012-08-21 11:04:37 -0700282class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800283
284
jadmanski0afbb632008-06-06 21:10:57 +0000285 def __init__(self):
286 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000287 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700288 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000289 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700290 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700291 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Dan Shic458f662015-04-29 12:12:38 -0700292 _db, _drone_manager)
showard170873e2009-01-07 00:22:26 +0000293 self._host_agents = {}
294 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000295 self._tick_count = 0
296 self._last_garbage_stats_time = time.time()
297 self._seconds_between_garbage_stats = 60 * (
298 global_config.global_config.get_config_value(
299 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700300 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700301 self._tick_debug = global_config.global_config.get_config_value(
302 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
303 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700304 self._extra_debugging = global_config.global_config.get_config_value(
305 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
306 default=False)
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800307 self._inline_host_acquisition = (
308 global_config.global_config.get_config_value(
309 scheduler_config.CONFIG_SECTION,
310 'inline_host_acquisition', type=bool, default=True))
mbligh36768f02008-02-22 18:28:33 +0000311
Prashanth Bf66d51b2014-05-06 12:42:25 -0700312 # If _inline_host_acquisition is set the scheduler will acquire and
313 # release hosts against jobs inline, with the tick. Otherwise the
314 # scheduler will only focus on jobs that already have hosts, and
315 # will not explicitly unlease a host when a job finishes using it.
316 self._job_query_manager = query_managers.AFEJobQueryManager()
317 self._host_scheduler = (host_scheduler.BaseHostScheduler()
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800318 if self._inline_host_acquisition else
Prashanth Bf66d51b2014-05-06 12:42:25 -0700319 host_scheduler.DummyHostScheduler())
320
mbligh36768f02008-02-22 18:28:33 +0000321
showard915958d2009-04-22 21:00:58 +0000322 def initialize(self, recover_hosts=True):
323 self._periodic_cleanup.initialize()
324 self._24hr_upkeep.initialize()
Dan Shi55d58992015-05-05 09:10:02 -0700325 # Execute all actions queued in the cleanup tasks. Scheduler tick will
326 # run a refresh task first. If there is any action in the queue, refresh
327 # will raise an exception.
328 _drone_manager.execute_actions()
showard915958d2009-04-22 21:00:58 +0000329
jadmanski0afbb632008-06-06 21:10:57 +0000330 # always recover processes
331 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000332
jadmanski0afbb632008-06-06 21:10:57 +0000333 if recover_hosts:
334 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000335
336
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800337 # TODO(pprabhu) Drop this metric once tick_times has been verified.
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -0800338 @metrics.SecondsTimerDecorator(
339 'chromeos/autotest/scheduler/tick_durations/tick')
jadmanski0afbb632008-06-06 21:10:57 +0000340 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700341 """
342 This is an altered version of tick() where we keep track of when each
343 major step begins so we can try to figure out where we are using most
344 of the tick time.
345 """
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800346 with metrics.RuntimeBreakdownTimer(
347 'chromeos/autotest/scheduler/tick_times') as breakdown_timer:
348 self._log_tick_msg('New tick')
349 system_utils.DroneCache.refresh()
350
351 with breakdown_timer.Step('garbage_collection'):
352 self._garbage_collection()
353 with breakdown_timer.Step('trigger_refresh'):
354 self._log_tick_msg('Starting _drone_manager.trigger_refresh')
355 _drone_manager.trigger_refresh()
356 with breakdown_timer.Step('process_recurring_runs'):
357 self._process_recurring_runs()
358 with breakdown_timer.Step('schedule_delay_tasks'):
359 self._schedule_delay_tasks()
360 with breakdown_timer.Step('schedule_running_host_queue_entries'):
361 self._schedule_running_host_queue_entries()
362 with breakdown_timer.Step('schedule_special_tasks'):
363 self._schedule_special_tasks()
364 with breakdown_timer.Step('schedule_new_jobs'):
365 self._schedule_new_jobs()
366 with breakdown_timer.Step('sync_refresh'):
367 self._log_tick_msg('Starting _drone_manager.sync_refresh')
368 _drone_manager.sync_refresh()
369 # _run_cleanup must be called between drone_manager.sync_refresh,
370 # and drone_manager.execute_actions, as sync_refresh will clear the
371 # calls queued in drones. Therefore, any action that calls
372 # drone.queue_call to add calls to the drone._calls, should be after
373 # drone refresh is completed and before
374 # drone_manager.execute_actions at the end of the tick.
375 with breakdown_timer.Step('run_cleanup'):
376 self._run_cleanup()
377 with breakdown_timer.Step('find_aborting'):
378 self._find_aborting()
379 with breakdown_timer.Step('find_aborted_special_tasks'):
380 self._find_aborted_special_tasks()
381 with breakdown_timer.Step('handle_agents'):
382 self._handle_agents()
383 with breakdown_timer.Step('host_scheduler_tick'):
384 self._log_tick_msg('Starting _host_scheduler.tick')
385 self._host_scheduler.tick()
386 with breakdown_timer.Step('drones_execute_actions'):
387 self._log_tick_msg('Starting _drone_manager.execute_actions')
388 _drone_manager.execute_actions()
389 with breakdown_timer.Step('send_queued_emails'):
390 self._log_tick_msg(
391 'Starting email_manager.manager.send_queued_emails')
392 email_manager.manager.send_queued_emails()
393 with breakdown_timer.Step('db_reset_queries'):
394 self._log_tick_msg('Starting django.db.reset_queries')
395 django.db.reset_queries()
396
397 self._tick_count += 1
398 metrics.Counter('chromeos/autotest/scheduler/tick').increment()
mbligh36768f02008-02-22 18:28:33 +0000399
showard97aed502008-11-04 02:01:24 +0000400
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800401 @_calls_log_tick_msg
mblighf3294cc2009-04-08 21:17:38 +0000402 def _run_cleanup(self):
403 self._periodic_cleanup.run_cleanup_maybe()
404 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000405
mbligh36768f02008-02-22 18:28:33 +0000406
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800407 @_calls_log_tick_msg
showardf13a9e22009-12-18 22:54:09 +0000408 def _garbage_collection(self):
409 threshold_time = time.time() - self._seconds_between_garbage_stats
410 if threshold_time < self._last_garbage_stats_time:
411 # Don't generate these reports very often.
412 return
413
414 self._last_garbage_stats_time = time.time()
415 # Force a full level 0 collection (because we can, it doesn't hurt
416 # at this interval).
417 gc.collect()
418 logging.info('Logging garbage collector stats on tick %d.',
419 self._tick_count)
420 gc_stats._log_garbage_collector_stats()
421
422
showard170873e2009-01-07 00:22:26 +0000423 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
424 for object_id in object_ids:
425 agent_dict.setdefault(object_id, set()).add(agent)
426
427
428 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
429 for object_id in object_ids:
430 assert object_id in agent_dict
431 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700432 # If an ID has no more active agent associated, there is no need to
433 # keep it in the dictionary. Otherwise, scheduler will keep an
434 # unnecessarily big dictionary until being restarted.
435 if not agent_dict[object_id]:
436 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000437
438
showardd1195652009-12-08 22:21:02 +0000439 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700440 """
441 Creates and adds an agent to the dispatchers list.
442
443 In creating the agent we also pass on all the queue_entry_ids and
444 host_ids from the special agent task. For every agent we create, we
445 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
446 against the host_ids given to it. So theoritically, a host can have any
447 number of agents associated with it, and each of them can have any
448 special agent task, though in practice we never see > 1 agent/task per
449 host at any time.
450
451 @param agent_task: A SpecialTask for the agent to manage.
452 """
showardd1195652009-12-08 22:21:02 +0000453 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000454 self._agents.append(agent)
455 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000456 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
457 self._register_agent_for_ids(self._queue_entry_agents,
458 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000459
showard170873e2009-01-07 00:22:26 +0000460
461 def get_agents_for_entry(self, queue_entry):
462 """
463 Find agents corresponding to the specified queue_entry.
464 """
showardd3dc1992009-04-22 21:01:40 +0000465 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000466
467
468 def host_has_agent(self, host):
469 """
470 Determine if there is currently an Agent present using this host.
471 """
472 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000473
474
jadmanski0afbb632008-06-06 21:10:57 +0000475 def remove_agent(self, agent):
476 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000477 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
478 agent)
479 self._unregister_agent_for_ids(self._queue_entry_agents,
480 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000481
482
showard8cc058f2009-09-08 16:26:33 +0000483 def _host_has_scheduled_special_task(self, host):
484 return bool(models.SpecialTask.objects.filter(host__id=host.id,
485 is_active=False,
486 is_complete=False))
487
488
jadmanski0afbb632008-06-06 21:10:57 +0000489 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000490 agent_tasks = self._create_recovery_agent_tasks()
491 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000492 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000493 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000494 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000495 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000496 self._reverify_remaining_hosts()
497 # reinitialize drones after killing orphaned processes, since they can
498 # leave around files when they die
499 _drone_manager.execute_actions()
500 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000501
showard170873e2009-01-07 00:22:26 +0000502
showardd1195652009-12-08 22:21:02 +0000503 def _create_recovery_agent_tasks(self):
504 return (self._get_queue_entry_agent_tasks()
505 + self._get_special_task_agent_tasks(is_active=True))
506
507
508 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700509 """
510 Get agent tasks for all hqe in the specified states.
511
512 Loosely this translates to taking a hqe in one of the specified states,
513 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
514 through _get_agent_task_for_queue_entry. Each queue entry can only have
515 one agent task at a time, but there might be multiple queue entries in
516 the group.
517
518 @return: A list of AgentTasks.
519 """
showardd1195652009-12-08 22:21:02 +0000520 # host queue entry statuses handled directly by AgentTasks (Verifying is
521 # handled through SpecialTasks, so is not listed here)
522 statuses = (models.HostQueueEntry.Status.STARTING,
523 models.HostQueueEntry.Status.RUNNING,
524 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000525 models.HostQueueEntry.Status.PARSING,
526 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000527 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000528 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000529 where='status IN (%s)' % status_list)
530
531 agent_tasks = []
532 used_queue_entries = set()
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800533 hqe_count_by_status = {}
showardd1195652009-12-08 22:21:02 +0000534 for entry in queue_entries:
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800535 hqe_count_by_status[entry.status] = (
536 hqe_count_by_status.get(entry.status, 0) + 1)
showardd1195652009-12-08 22:21:02 +0000537 if self.get_agents_for_entry(entry):
538 # already being handled
539 continue
540 if entry in used_queue_entries:
541 # already picked up by a synchronous job
542 continue
543 agent_task = self._get_agent_task_for_queue_entry(entry)
544 agent_tasks.append(agent_task)
545 used_queue_entries.update(agent_task.queue_entries)
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800546
547 for status, count in hqe_count_by_status.iteritems():
548 metrics.Gauge(
549 'chromeos/autotest/scheduler/active_host_queue_entries'
550 ).set(count, fields={'status': status})
551
showardd1195652009-12-08 22:21:02 +0000552 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000553
554
showardd1195652009-12-08 22:21:02 +0000555 def _get_special_task_agent_tasks(self, is_active=False):
556 special_tasks = models.SpecialTask.objects.filter(
557 is_active=is_active, is_complete=False)
558 return [self._get_agent_task_for_special_task(task)
559 for task in special_tasks]
560
561
562 def _get_agent_task_for_queue_entry(self, queue_entry):
563 """
beeps8bb1f7d2013-08-05 01:30:09 -0700564 Construct an AgentTask instance for the given active HostQueueEntry.
565
showardd1195652009-12-08 22:21:02 +0000566 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700567 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000568 """
569 task_entries = queue_entry.job.get_group_entries(queue_entry)
570 self._check_for_duplicate_host_entries(task_entries)
571
572 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
573 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000574 if queue_entry.is_hostless():
575 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000576 return QueueTask(queue_entries=task_entries)
577 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700578 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000579 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700580 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000581 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700582 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000583
Prashanth B0e960282014-05-13 19:38:28 -0700584 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800585 '_get_agent_task_for_queue_entry got entry with '
586 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000587
588
589 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000590 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
591 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000592 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000593 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000594 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000595 if using_host:
showardd1195652009-12-08 22:21:02 +0000596 self._assert_host_has_no_agent(task_entry)
597
598
599 def _assert_host_has_no_agent(self, entry):
600 """
601 @param entry: a HostQueueEntry or a SpecialTask
602 """
603 if self.host_has_agent(entry.host):
604 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700605 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000606 'While scheduling %s, host %s already has a host agent %s'
607 % (entry, entry.host, agent.task))
608
609
610 def _get_agent_task_for_special_task(self, special_task):
611 """
612 Construct an AgentTask class to run the given SpecialTask and add it
613 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700614
MK Ryu35d661e2014-09-25 17:44:10 -0700615 A special task is created through schedule_special_tasks, but only if
beeps8bb1f7d2013-08-05 01:30:09 -0700616 the host doesn't already have an agent. This happens through
617 add_agent_task. All special agent tasks are given a host on creation,
618 and a Null hqe. To create a SpecialAgentTask object, you need a
619 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
620 object contains a hqe it's passed on to the special agent task, which
621 creates a HostQueueEntry and saves it as it's queue_entry.
622
showardd1195652009-12-08 22:21:02 +0000623 @param special_task: a models.SpecialTask instance
624 @returns an AgentTask to run this SpecialTask
625 """
626 self._assert_host_has_no_agent(special_task)
627
beeps5e2bb4a2013-10-28 11:26:45 -0700628 special_agent_task_classes = (prejob_task.CleanupTask,
629 prejob_task.VerifyTask,
630 prejob_task.RepairTask,
631 prejob_task.ResetTask,
632 prejob_task.ProvisionTask)
633
showardd1195652009-12-08 22:21:02 +0000634 for agent_task_class in special_agent_task_classes:
635 if agent_task_class.TASK_TYPE == special_task.task:
636 return agent_task_class(task=special_task)
637
Prashanth B0e960282014-05-13 19:38:28 -0700638 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800639 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000640
641
642 def _register_pidfiles(self, agent_tasks):
643 for agent_task in agent_tasks:
644 agent_task.register_necessary_pidfiles()
645
646
647 def _recover_tasks(self, agent_tasks):
648 orphans = _drone_manager.get_orphaned_autoserv_processes()
649
650 for agent_task in agent_tasks:
651 agent_task.recover()
652 if agent_task.monitor and agent_task.monitor.has_process():
653 orphans.discard(agent_task.monitor.get_process())
654 self.add_agent_task(agent_task)
655
656 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000657
658
showard8cc058f2009-09-08 16:26:33 +0000659 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000660 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
661 % status):
showard0db3d432009-10-12 20:29:15 +0000662 if entry.status == status and not self.get_agents_for_entry(entry):
663 # The status can change during iteration, e.g., if job.run()
664 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000665 yield entry
666
667
showard6878e8b2009-07-20 22:37:45 +0000668 def _check_for_remaining_orphan_processes(self, orphans):
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800669 m = 'chromeos/autotest/errors/unrecovered_orphan_processes'
670 metrics.Gauge(m).set(len(orphans))
671
showard6878e8b2009-07-20 22:37:45 +0000672 if not orphans:
673 return
674 subject = 'Unrecovered orphan autoserv processes remain'
675 message = '\n'.join(str(process) for process in orphans)
mbligh5fa9e112009-08-03 16:46:06 +0000676 die_on_orphans = global_config.global_config.get_config_value(
677 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
678
679 if die_on_orphans:
680 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000681
showard170873e2009-01-07 00:22:26 +0000682
showard8cc058f2009-09-08 16:26:33 +0000683 def _recover_pending_entries(self):
684 for entry in self._get_unassigned_entries(
685 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000686 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000687 entry.on_pending()
688
689
showardb8900452009-10-12 20:31:01 +0000690 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000691 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000692 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
693 unrecovered_hqes = []
694 for queue_entry in queue_entries:
695 special_tasks = models.SpecialTask.objects.filter(
696 task__in=(models.SpecialTask.Task.CLEANUP,
697 models.SpecialTask.Task.VERIFY),
698 queue_entry__id=queue_entry.id,
699 is_complete=False)
700 if special_tasks.count() == 0:
701 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000702
showardb8900452009-10-12 20:31:01 +0000703 if unrecovered_hqes:
704 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700705 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000706 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000707 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000708
709
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800710 @_calls_log_tick_msg
showard65db3932009-10-28 19:54:35 +0000711 def _schedule_special_tasks(self):
712 """
713 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700714
715 Special tasks include PreJobTasks like verify, reset and cleanup.
716 They are created through _schedule_new_jobs and associated with a hqe
717 This method translates SpecialTasks to the appropriate AgentTask and
718 adds them to the dispatchers agents list, so _handle_agents can execute
719 them.
showard65db3932009-10-28 19:54:35 +0000720 """
Prashanth B4ec98672014-05-15 10:44:54 -0700721 # When the host scheduler is responsible for acquisition we only want
722 # to run tasks with leased hosts. All hqe tasks will already have
723 # leased hosts, and we don't want to run frontend tasks till the host
724 # scheduler has vetted the assignment. Note that this doesn't include
725 # frontend tasks with hosts leased by other active hqes.
726 for task in self._job_query_manager.get_prioritized_special_tasks(
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800727 only_tasks_with_leased_hosts=not self._inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000728 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000729 continue
showardd1195652009-12-08 22:21:02 +0000730 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000731
732
showard170873e2009-01-07 00:22:26 +0000733 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000734 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000735 # should never happen
showarded2afea2009-07-07 20:54:07 +0000736 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000737 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000738 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700739 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000740 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000741
742
jadmanski0afbb632008-06-06 21:10:57 +0000743 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000744 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700745 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000746 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000747 if self.host_has_agent(host):
748 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000749 continue
showard8cc058f2009-09-08 16:26:33 +0000750 if self._host_has_scheduled_special_task(host):
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700751 # host will have a special task scheduled on the next tick
showard8cc058f2009-09-08 16:26:33 +0000752 continue
showard170873e2009-01-07 00:22:26 +0000753 if print_message:
showardb18134f2009-03-20 20:52:18 +0000754 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000755 models.SpecialTask.objects.create(
756 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000757 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000758
759
jadmanski0afbb632008-06-06 21:10:57 +0000760 def _recover_hosts(self):
761 # recover "Repair Failed" hosts
762 message = 'Reverifying dead host %s'
763 self._reverify_hosts_where("status = 'Repair Failed'",
764 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000765
766
showard89f84db2009-03-12 20:39:13 +0000767 def _refresh_pending_queue_entries(self):
768 """
769 Lookup the pending HostQueueEntries and call our HostScheduler
770 refresh() method given that list. Return the list.
771
772 @returns A list of pending HostQueueEntries sorted in priority order.
773 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700774 queue_entries = self._job_query_manager.get_pending_queue_entries(
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800775 only_hostless=not self._inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000776 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000777 return []
showard89f84db2009-03-12 20:39:13 +0000778 return queue_entries
779
780
showarda9545c02009-12-18 22:44:26 +0000781 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800782 """Schedule a hostless (suite) job.
783
784 @param queue_entry: The queue_entry representing the hostless job.
785 """
showarda9545c02009-12-18 22:44:26 +0000786 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700787
788 # Need to set execution_subdir before setting the status:
789 # After a restart of the scheduler, agents will be restored for HQEs in
790 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
791 # execution_subdir is needed. Therefore it must be set before entering
792 # one of these states.
793 # Otherwise, if the scheduler was interrupted between setting the status
794 # and the execution_subdir, upon it's restart restoring agents would
795 # fail.
796 # Is there a way to get a status in one of these states without going
797 # through this code? Following cases are possible:
798 # - If it's aborted before being started:
799 # active bit will be 0, so there's nothing to parse, it will just be
800 # set to completed by _find_aborting. Critical statuses are skipped.
801 # - If it's aborted or it fails after being started:
802 # It was started, so this code was executed.
803 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000804 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000805
806
beepscc9fc702013-12-02 12:45:38 -0800807 def _schedule_host_job(self, host, queue_entry):
808 """Schedules a job on the given host.
809
810 1. Assign the host to the hqe, if it isn't already assigned.
811 2. Create a SpecialAgentTask for the hqe.
812 3. Activate the hqe.
813
814 @param queue_entry: The job to schedule.
815 @param host: The host to schedule the job on.
816 """
817 if self.host_has_agent(host):
818 host_agent_task = list(self._host_agents.get(host.id))[0].task
beepscc9fc702013-12-02 12:45:38 -0800819 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700820 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800821
822
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800823 @_calls_log_tick_msg
showard89f84db2009-03-12 20:39:13 +0000824 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700825 """
826 Find any new HQEs and call schedule_pre_job_tasks for it.
827
828 This involves setting the status of the HQE and creating a row in the
829 db corresponding the the special task, through
830 scheduler_models._queue_special_task. The new db row is then added as
831 an agent to the dispatcher through _schedule_special_tasks and
832 scheduled for execution on the drone through _handle_agents.
833 """
showard89f84db2009-03-12 20:39:13 +0000834 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000835
beepscc9fc702013-12-02 12:45:38 -0800836 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700837 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700838 new_jobs_with_hosts = 0
839 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800840 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700841 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000842
beepscc9fc702013-12-02 12:45:38 -0800843 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000844 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000845 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700846 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000847 else:
beepscc9fc702013-12-02 12:45:38 -0800848 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700849 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700850
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800851 metrics.Counter(
852 'chromeos/autotest/scheduler/scheduled_jobs_hostless'
853 ).increment_by(new_hostless_jobs)
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800854
beepscc9fc702013-12-02 12:45:38 -0800855 if not host_jobs:
856 return
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800857
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800858 if not self._inline_host_acquisition:
Prathmesh Prabhu3c6a3bf2016-12-20 11:29:02 -0800859 # In this case, host_scheduler is responsible for scheduling
860 # host_jobs. Scheduling the jobs ourselves can lead to DB corruption
861 # since host_scheduler assumes it is the single process scheduling
862 # host jobs.
863 metrics.Gauge(
864 'chromeos/autotest/errors/scheduler/unexpected_host_jobs').set(
865 len(host_jobs))
866 return
867
Prashanth Bf66d51b2014-05-06 12:42:25 -0700868 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
869 for host_assignment in jobs_with_hosts:
870 self._schedule_host_job(host_assignment.host, host_assignment.job)
871 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800872
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800873 metrics.Counter(
874 'chromeos/autotest/scheduler/scheduled_jobs_with_hosts'
875 ).increment_by(new_jobs_with_hosts)
876 # TODO(pprabhu): Decide what to do about this metric. Million dollar
877 # question: What happens to jobs that were not matched. Do they stay in
878 # the queue, and get processed right here in the next tick (then we want
879 # a guage corresponding to the number of outstanding unmatched host
880 # jobs), or are they handled somewhere else (then we need a counter
881 # corresponding to failed_to_match_with_hosts jobs).
882 #autotest_stats.Gauge(key).send('new_jobs_without_hosts',
883 # new_jobs_need_hosts -
884 # new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000885
886
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800887 @_calls_log_tick_msg
showard8cc058f2009-09-08 16:26:33 +0000888 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700889 """
890 Adds agents to the dispatcher.
891
892 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
893 QueueTask for example, will have a job with a control file, and
894 the agent will have methods that poll, abort and check if the queue
895 task is finished. The dispatcher runs the agent_task, as well as
896 other agents in it's _agents member, through _handle_agents, by
897 calling the Agents tick().
898
899 This method creates an agent for each HQE in one of (starting, running,
900 gathering, parsing, archiving) states, and adds it to the dispatcher so
901 it is handled by _handle_agents.
902 """
showardd1195652009-12-08 22:21:02 +0000903 for agent_task in self._get_queue_entry_agent_tasks():
904 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000905
906
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800907 @_calls_log_tick_msg
showard8cc058f2009-09-08 16:26:33 +0000908 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000909 for entry in scheduler_models.HostQueueEntry.fetch(
910 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000911 task = entry.job.schedule_delayed_callback_task(entry)
912 if task:
showardd1195652009-12-08 22:21:02 +0000913 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000914
915
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800916 @_calls_log_tick_msg
jadmanski0afbb632008-06-06 21:10:57 +0000917 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700918 """
919 Looks through the afe_host_queue_entries for an aborted entry.
920
921 The aborted bit is set on an HQE in many ways, the most common
922 being when a user requests an abort through the frontend, which
923 results in an rpc from the afe to abort_host_queue_entries.
924 """
jamesrene7c65cb2010-06-08 20:38:10 +0000925 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000926 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700927 where='aborted=1 and complete=0'):
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800928
929 # If the job is running on a shard, let the shard handle aborting
930 # it and sync back the right status.
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800931 if entry.job.shard_id is not None and not server_utils.is_shard():
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800932 logging.info('Waiting for shard %s to abort hqe %s',
933 entry.job.shard_id, entry)
934 continue
935
showardf4a2e502009-07-28 20:06:39 +0000936 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800937
938 # The task would have started off with both is_complete and
939 # is_active = False. Aborted tasks are neither active nor complete.
940 # For all currently active tasks this will happen through the agent,
941 # but we need to manually update the special tasks that haven't
942 # started yet, because they don't have agents.
943 models.SpecialTask.objects.filter(is_active=False,
944 queue_entry_id=entry.id).update(is_complete=True)
945
showardd3dc1992009-04-22 21:01:40 +0000946 for agent in self.get_agents_for_entry(entry):
947 agent.abort()
948 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000949 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700950 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000951 for job in jobs_to_stop:
952 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000953
954
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800955 @_calls_log_tick_msg
beeps8bb1f7d2013-08-05 01:30:09 -0700956 def _find_aborted_special_tasks(self):
957 """
958 Find SpecialTasks that have been marked for abortion.
959
960 Poll the database looking for SpecialTasks that are active
961 and have been marked for abortion, then abort them.
962 """
963
964 # The completed and active bits are very important when it comes
965 # to scheduler correctness. The active bit is set through the prolog
966 # of a special task, and reset through the cleanup method of the
967 # SpecialAgentTask. The cleanup is called both through the abort and
968 # epilog. The complete bit is set in several places, and in general
969 # a hanging job will have is_active=1 is_complete=0, while a special
970 # task which completed will have is_active=0 is_complete=1. To check
971 # aborts we directly check active because the complete bit is set in
972 # several places, including the epilog of agent tasks.
973 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
974 is_aborted=True)
975 for task in aborted_tasks:
976 # There are 2 ways to get the agent associated with a task,
977 # through the host and through the hqe. A special task
978 # always needs a host, but doesn't always need a hqe.
979 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700980 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000981
beeps8bb1f7d2013-08-05 01:30:09 -0700982 # The epilog preforms critical actions such as
983 # queueing the next SpecialTask, requeuing the
984 # hqe etc, however it doesn't actually kill the
985 # monitor process and set the 'done' bit. Epilogs
986 # assume that the job failed, and that the monitor
987 # process has already written an exit code. The
988 # done bit is a necessary condition for
989 # _handle_agents to schedule any more special
990 # tasks against the host, and it must be set
991 # in addition to is_active, is_complete and success.
992 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000993 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700994
995
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700996 def _can_start_agent(self, agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000997 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000998 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000999 return True
1000 # don't allow any nonzero-process agents to run after we've reached a
1001 # limit (this avoids starvation of many-process agents)
1002 if have_reached_limit:
1003 return False
1004 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001005 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +00001006 agent.task.owner_username,
1007 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +00001008 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001009 return False
showard4c5374f2008-09-04 17:02:56 +00001010 return True
1011
1012
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -08001013 @_calls_log_tick_msg
jadmanski0afbb632008-06-06 21:10:57 +00001014 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -07001015 """
1016 Handles agents of the dispatcher.
1017
1018 Appropriate Agents are added to the dispatcher through
1019 _schedule_running_host_queue_entries. These agents each
1020 have a task. This method runs the agents task through
1021 agent.tick() leading to:
1022 agent.start
1023 prolog -> AgentTasks prolog
1024 For each queue entry:
1025 sets host status/status to Running
1026 set started_on in afe_host_queue_entries
1027 run -> AgentTasks run
1028 Creates PidfileRunMonitor
1029 Queues the autoserv command line for this AgentTask
1030 via the drone manager. These commands are executed
1031 through the drone managers execute actions.
1032 poll -> AgentTasks/BaseAgentTask poll
1033 checks the monitors exit_code.
1034 Executes epilog if task is finished.
1035 Executes AgentTasks _finish_task
1036 finish_task is usually responsible for setting the status
1037 of the HQE/host, and updating it's active and complete fileds.
1038
1039 agent.is_done
1040 Removed the agent from the dispatchers _agents queue.
1041 Is_done checks the finished bit on the agent, that is
1042 set based on the Agents task. During the agents poll
1043 we check to see if the monitor process has exited in
1044 it's finish method, and set the success member of the
1045 task based on this exit code.
1046 """
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001047 num_started_this_tick = 0
1048 num_finished_this_tick = 0
showard4c5374f2008-09-04 17:02:56 +00001049 have_reached_limit = False
1050 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -07001051 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +00001052 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001053 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1054 'queue_entry ids:%s' % (agent.host_ids,
1055 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001056 if not agent.started:
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001057 if not self._can_start_agent(agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001058 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001059 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001060 continue
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001061 num_started_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001062 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001063 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001064 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001065 if agent.is_done():
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001066 num_finished_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001067 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001068 self.remove_agent(agent)
Prathmesh Prabhued7ece92016-11-23 11:19:43 -08001069
1070 metrics.Counter(
1071 'chromeos/autotest/scheduler/agent_processes_started'
1072 ).increment_by(num_started_this_tick)
1073 metrics.Counter(
1074 'chromeos/autotest/scheduler/agent_processes_finished'
1075 ).increment_by(num_finished_this_tick)
1076 num_agent_processes = _drone_manager.total_running_processes()
1077 metrics.Gauge(
1078 'chromeos/autotest/scheduler/agent_processes'
1079 ).set(num_agent_processes)
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001080 logging.info('%d running processes. %d added this tick.',
Prathmesh Prabhued7ece92016-11-23 11:19:43 -08001081 num_agent_processes, num_started_this_tick)
mbligh36768f02008-02-22 18:28:33 +00001082
1083
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -08001084 @_calls_log_tick_msg
showard29f7cd22009-04-29 21:16:24 +00001085 def _process_recurring_runs(self):
1086 recurring_runs = models.RecurringRun.objects.filter(
1087 start_date__lte=datetime.datetime.now())
1088 for rrun in recurring_runs:
1089 # Create job from template
1090 job = rrun.job
1091 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001092 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001093
1094 host_objects = info['hosts']
1095 one_time_hosts = info['one_time_hosts']
1096 metahost_objects = info['meta_hosts']
1097 dependencies = info['dependencies']
1098 atomic_group = info['atomic_group']
1099
1100 for host in one_time_hosts or []:
1101 this_host = models.Host.create_one_time_host(host.hostname)
1102 host_objects.append(this_host)
1103
1104 try:
1105 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001106 options=options,
showard29f7cd22009-04-29 21:16:24 +00001107 host_objects=host_objects,
1108 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001109 atomic_group=atomic_group)
1110
1111 except Exception, ex:
1112 logging.exception(ex)
1113 #TODO send email
1114
1115 if rrun.loop_count == 1:
1116 rrun.delete()
1117 else:
1118 if rrun.loop_count != 0: # if not infinite loop
1119 # calculate new start_date
1120 difference = datetime.timedelta(seconds=rrun.loop_period)
1121 rrun.start_date = rrun.start_date + difference
1122 rrun.loop_count -= 1
1123 rrun.save()
1124
1125
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -08001126 def _log_tick_msg(self, msg):
1127 if self._tick_debug:
1128 logging.debug(msg)
1129
1130
1131 def _log_extra_msg(self, msg):
1132 if self._extra_debugging:
1133 logging.debug(msg)
1134
1135
Simran Basia858a232012-08-21 11:04:37 -07001136SiteDispatcher = utils.import_site_class(
1137 __file__, 'autotest_lib.scheduler.site_monitor_db',
1138 'SiteDispatcher', BaseDispatcher)
1139
1140class Dispatcher(SiteDispatcher):
1141 pass
1142
1143
mbligh36768f02008-02-22 18:28:33 +00001144class Agent(object):
showard77182562009-06-10 00:16:05 +00001145 """
Alex Miller47715eb2013-07-24 03:34:01 -07001146 An agent for use by the Dispatcher class to perform a task. An agent wraps
1147 around an AgentTask mainly to associate the AgentTask with the queue_entry
1148 and host ids.
showard77182562009-06-10 00:16:05 +00001149
1150 The following methods are required on all task objects:
1151 poll() - Called periodically to let the task check its status and
1152 update its internal state. If the task succeeded.
1153 is_done() - Returns True if the task is finished.
1154 abort() - Called when an abort has been requested. The task must
1155 set its aborted attribute to True if it actually aborted.
1156
1157 The following attributes are required on all task objects:
1158 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001159 success - bool, True if this task succeeded.
1160 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1161 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001162 """
1163
1164
showard418785b2009-11-23 20:19:59 +00001165 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001166 """
Alex Miller47715eb2013-07-24 03:34:01 -07001167 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001168 """
showard8cc058f2009-09-08 16:26:33 +00001169 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001170
showard77182562009-06-10 00:16:05 +00001171 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001172 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001173
showard8cc058f2009-09-08 16:26:33 +00001174 self.queue_entry_ids = task.queue_entry_ids
1175 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001176
showard8cc058f2009-09-08 16:26:33 +00001177 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001178 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001179
1180
jadmanski0afbb632008-06-06 21:10:57 +00001181 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001182 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001183 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001184 self.task.poll()
1185 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001186 self.finished = True
showardec113162008-05-08 00:52:49 +00001187
1188
jadmanski0afbb632008-06-06 21:10:57 +00001189 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001190 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001191
1192
showardd3dc1992009-04-22 21:01:40 +00001193 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001194 if self.task:
1195 self.task.abort()
1196 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001197 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001198 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001199
showardd3dc1992009-04-22 21:01:40 +00001200
beeps5e2bb4a2013-10-28 11:26:45 -07001201class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001202 """
1203 Common functionality for QueueTask and HostlessQueueTask
1204 """
1205 def __init__(self, queue_entries):
1206 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001207 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001208 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001209
1210
showard73ec0442009-02-07 02:05:20 +00001211 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001212 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001213
1214
jamesrenc44ae992010-02-19 00:12:54 +00001215 def _write_control_file(self, execution_path):
1216 control_path = _drone_manager.attach_file_to_execution(
1217 execution_path, self.job.control_file)
1218 return control_path
1219
1220
Aviv Keshet308e7362013-05-21 14:43:16 -07001221 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001222 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001223 execution_path = self.queue_entries[0].execution_path()
1224 control_path = self._write_control_file(execution_path)
1225 hostnames = ','.join(entry.host.hostname
1226 for entry in self.queue_entries
1227 if not entry.is_hostless())
1228
1229 execution_tag = self.queue_entries[0].execution_tag()
1230 params = _autoserv_command_line(
1231 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001232 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001233 _drone_manager.absolute_path(control_path)],
1234 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001235 if self.job.is_image_update_job():
1236 params += ['--image', self.job.update_image_path]
1237
jamesrenc44ae992010-02-19 00:12:54 +00001238 return params
showardd1195652009-12-08 22:21:02 +00001239
1240
1241 @property
1242 def num_processes(self):
1243 return len(self.queue_entries)
1244
1245
1246 @property
1247 def owner_username(self):
1248 return self.job.owner
1249
1250
1251 def _working_directory(self):
1252 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001253
1254
jadmanski0afbb632008-06-06 21:10:57 +00001255 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001256 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001257 keyval_dict = self.job.keyval_dict()
1258 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001259 group_name = self.queue_entries[0].get_group_name()
1260 if group_name:
1261 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001262 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001263 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001264 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001265 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001266
1267
showard35162b02009-03-03 02:17:30 +00001268 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001269 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001270 _drone_manager.write_lines_to_file(error_file_path,
1271 [_LOST_PROCESS_ERROR])
1272
1273
showardd3dc1992009-04-22 21:01:40 +00001274 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001275 if not self.monitor:
1276 return
1277
showardd9205182009-04-27 20:09:55 +00001278 self._write_job_finished()
1279
showard35162b02009-03-03 02:17:30 +00001280 if self.monitor.lost_process:
1281 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001282
jadmanskif7fa2cc2008-10-01 14:13:23 +00001283
showardcbd74612008-11-19 21:42:02 +00001284 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001285 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001286 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001287 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001288 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001289
1290
jadmanskif7fa2cc2008-10-01 14:13:23 +00001291 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001292 if not self.monitor or not self.monitor.has_process():
1293 return
1294
jadmanskif7fa2cc2008-10-01 14:13:23 +00001295 # build up sets of all the aborted_by and aborted_on values
1296 aborted_by, aborted_on = set(), set()
1297 for queue_entry in self.queue_entries:
1298 if queue_entry.aborted_by:
1299 aborted_by.add(queue_entry.aborted_by)
1300 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1301 aborted_on.add(t)
1302
1303 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001304 # TODO(showard): this conditional is now obsolete, we just need to leave
1305 # it in temporarily for backwards compatibility over upgrades. delete
1306 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001307 assert len(aborted_by) <= 1
1308 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001309 aborted_by_value = aborted_by.pop()
1310 aborted_on_value = max(aborted_on)
1311 else:
1312 aborted_by_value = 'autotest_system'
1313 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001314
showarda0382352009-02-11 23:36:43 +00001315 self._write_keyval_after_job("aborted_by", aborted_by_value)
1316 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001317
showardcbd74612008-11-19 21:42:02 +00001318 aborted_on_string = str(datetime.datetime.fromtimestamp(
1319 aborted_on_value))
1320 self._write_status_comment('Job aborted by %s on %s' %
1321 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001322
1323
jadmanski0afbb632008-06-06 21:10:57 +00001324 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001325 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001326 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001327 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001328
1329
jadmanski0afbb632008-06-06 21:10:57 +00001330 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001331 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001332 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001333
1334
1335class QueueTask(AbstractQueueTask):
1336 def __init__(self, queue_entries):
1337 super(QueueTask, self).__init__(queue_entries)
1338 self._set_ids(queue_entries=queue_entries)
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -08001339 self._enable_ssp_container = (
1340 global_config.global_config.get_config_value(
1341 'AUTOSERV', 'enable_ssp_container', type=bool,
1342 default=True))
showarda9545c02009-12-18 22:44:26 +00001343
1344
1345 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001346 self._check_queue_entry_statuses(
1347 self.queue_entries,
1348 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1349 models.HostQueueEntry.Status.RUNNING),
1350 allowed_host_statuses=(models.Host.Status.PENDING,
1351 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001352
1353 super(QueueTask, self).prolog()
1354
1355 for queue_entry in self.queue_entries:
1356 self._write_host_keyvals(queue_entry.host)
1357 queue_entry.host.set_status(models.Host.Status.RUNNING)
1358 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001359
1360
1361 def _finish_task(self):
1362 super(QueueTask, self)._finish_task()
1363
1364 for queue_entry in self.queue_entries:
1365 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001366 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001367
1368
Alex Miller9f01d5d2013-08-08 02:26:01 -07001369 def _command_line(self):
Dan Shi36cfd832014-10-10 13:38:51 -07001370 invocation = super(QueueTask, self)._command_line()
1371 # Check if server-side packaging is needed.
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -08001372 if (self._enable_ssp_container and
Dan Shi36cfd832014-10-10 13:38:51 -07001373 self.job.control_type == control_data.CONTROL_TYPE.SERVER and
1374 self.job.require_ssp != False):
Dan Shiec1d47d2015-02-13 11:38:13 -08001375 invocation += ['--require-ssp']
Dan Shi36cfd832014-10-10 13:38:51 -07001376 keyval_dict = self.job.keyval_dict()
1377 test_source_build = keyval_dict.get('test_source_build', None)
1378 if test_source_build:
1379 invocation += ['--test_source_build', test_source_build]
Dan Shi70647ca2015-07-16 22:52:35 -07001380 if self.job.parent_job_id:
1381 invocation += ['--parent_job_id', str(self.job.parent_job_id)]
Dan Shi36cfd832014-10-10 13:38:51 -07001382 return invocation + ['--verify_job_repo_url']
Alex Miller9f01d5d2013-08-08 02:26:01 -07001383
1384
Dan Shi1a189052013-10-28 14:41:35 -07001385class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001386 def __init__(self, queue_entry):
1387 super(HostlessQueueTask, self).__init__([queue_entry])
1388 self.queue_entry_ids = [queue_entry.id]
1389
1390
1391 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001392 super(HostlessQueueTask, self).prolog()
1393
1394
mbligh4608b002010-01-05 18:22:35 +00001395 def _finish_task(self):
1396 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001397
1398 # When a job is added to database, its initial status is always
1399 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1400 # status, check if any of them can be started. If scheduler hits some
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001401 # limit, e.g., max_hostless_jobs_per_drone, scheduler will
1402 # leave these jobs in Starting status. Otherwise, the jobs'
1403 # status will be changed to Running, and an autoserv process
1404 # will be started in drone for each of these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001405 # If the entry is still in status Starting, the process has not started
1406 # yet. Therefore, there is no need to parse and collect log. Without
1407 # this check, exception will be raised by scheduler as execution_subdir
1408 # for this queue entry does not have a value yet.
1409 hqe = self.queue_entries[0]
1410 if hqe.status != models.HostQueueEntry.Status.STARTING:
1411 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001412
1413
mbligh36768f02008-02-22 18:28:33 +00001414if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001415 main()