blob: 405f65f71486d00ab0dccdf1feebbb221d13a1d5 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
Richard Barnetteffed1722016-05-18 15:57:22 -07002
3#pylint: disable=C0111
4
mbligh36768f02008-02-22 18:28:33 +00005"""
6Autotest scheduler
7"""
showard909c7a62008-07-15 21:52:38 +00008
Dan Shif6c65bd2014-08-29 16:15:07 -07009import datetime
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -080010import functools
Dan Shif6c65bd2014-08-29 16:15:07 -070011import gc
12import logging
13import optparse
14import os
15import signal
16import sys
17import time
showard402934a2009-12-21 22:20:47 +000018
Alex Miller05d7b4c2013-03-04 07:49:38 -080019import common
showard21baa452008-10-21 00:08:39 +000020from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000021
22import django.db
23
Dan Shiec1d47d2015-02-13 11:38:13 -080024from autotest_lib.client.common_lib import control_data
Prashanth B0e960282014-05-13 19:38:28 -070025from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070026from autotest_lib.client.common_lib import utils
xixuan4de4e742017-01-30 13:31:35 -080027from autotest_lib.frontend.afe import models
Fang Dengc330bee2014-10-21 18:10:55 -070028from autotest_lib.scheduler import agent_task, drone_manager
beeps5e2bb4a2013-10-28 11:26:45 -070029from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
30from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070031from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070032from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070033from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000034from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080035from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070036from autotest_lib.server import autoserv_utils
Dan Shi114e1722016-01-10 18:12:53 -080037from autotest_lib.server import system_utils
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080038from autotest_lib.server import utils as server_utils
Dan Shicf2e8dd2015-05-07 17:18:48 -070039from autotest_lib.site_utils import metadata_reporter
Dan Shib9144a42014-12-01 16:09:32 -080040from autotest_lib.site_utils import server_manager_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080041
Dan Shi5e2efb72017-02-07 11:40:23 -080042try:
43 from chromite.lib import metrics
44 from chromite.lib import ts_mon_config
45except ImportError:
46 metrics = utils.metrics_mock
47 ts_mon_config = utils.metrics_mock
48
Dan Shicf2e8dd2015-05-07 17:18:48 -070049
showard549afad2009-08-20 23:33:36 +000050BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
51PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000052
mbligh36768f02008-02-22 18:28:33 +000053RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000054AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
55
56if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000057 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000058AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
59AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
60
61if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000062 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000063
showard35162b02009-03-03 02:17:30 +000064# error message to leave in results dir when an autoserv process disappears
65# mysteriously
66_LOST_PROCESS_ERROR = """\
67Autoserv failed abnormally during execution for this job, probably due to a
68system error on the Autotest server. Full results may not be available. Sorry.
69"""
70
Prashanth B0e960282014-05-13 19:38:28 -070071_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070072_db = None
mbligh36768f02008-02-22 18:28:33 +000073_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070074
75# These 2 globals are replaced for testing
76_autoserv_directory = autoserv_utils.autoserv_directory
77_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000078_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000079_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070080
mbligh36768f02008-02-22 18:28:33 +000081
mbligh83c1e9e2009-05-01 23:10:41 +000082def _site_init_monitor_db_dummy():
83 return {}
84
85
jamesren76fcf192010-04-21 20:39:50 +000086def _verify_default_drone_set_exists():
87 if (models.DroneSet.drone_sets_enabled() and
88 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070089 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080090 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000091
92
93def _sanity_check():
94 """Make sure the configs are consistent before starting the scheduler"""
95 _verify_default_drone_set_exists()
96
97
mbligh36768f02008-02-22 18:28:33 +000098def main():
showard27f33872009-04-07 18:20:53 +000099 try:
showard549afad2009-08-20 23:33:36 +0000100 try:
101 main_without_exception_handling()
102 except SystemExit:
103 raise
104 except:
105 logging.exception('Exception escaping in monitor_db')
106 raise
107 finally:
108 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000109
110
111def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700112 scheduler_lib.setup_logging(
113 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
114 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000115 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000116 parser = optparse.OptionParser(usage)
117 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
118 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000119 parser.add_option('--test', help='Indicate that scheduler is under ' +
120 'test and should use dummy autoserv and no parsing',
121 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700122 parser.add_option('--production',
123 help=('Indicate that scheduler is running in production '
124 'environment and it can use database that is not '
125 'hosted in localhost. If it is set to False, '
126 'scheduler will fail if database is not in '
127 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700128 action='store_true', default=False)
jadmanski0afbb632008-06-06 21:10:57 +0000129 (options, args) = parser.parse_args()
130 if len(args) != 1:
131 parser.print_usage()
132 return
mbligh36768f02008-02-22 18:28:33 +0000133
Dan Shif6c65bd2014-08-29 16:15:07 -0700134 scheduler_lib.check_production_settings(options)
135
showard5613c662009-06-08 23:30:33 +0000136 scheduler_enabled = global_config.global_config.get_config_value(
137 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
138
139 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800140 logging.error("Scheduler not enabled, set enable_scheduler to true in "
141 "the global_config's SCHEDULER section to enable it. "
142 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000143 sys.exit(1)
144
jadmanski0afbb632008-06-06 21:10:57 +0000145 global RESULTS_DIR
146 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000147
mbligh83c1e9e2009-05-01 23:10:41 +0000148 site_init = utils.import_site_function(__file__,
149 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
150 _site_init_monitor_db_dummy)
151 site_init()
152
showardcca334f2009-03-12 20:38:34 +0000153 # Change the cwd while running to avoid issues incase we were launched from
154 # somewhere odd (such as a random NFS home directory of the person running
155 # sudo to launch us as the appropriate user).
156 os.chdir(RESULTS_DIR)
157
jamesrenc7d387e2010-08-10 21:48:30 +0000158 # This is helpful for debugging why stuff a scheduler launches is
159 # misbehaving.
160 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000161
jadmanski0afbb632008-06-06 21:10:57 +0000162 if options.test:
163 global _autoserv_path
164 _autoserv_path = 'autoserv_dummy'
165 global _testing_mode
166 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000167
jamesrenc44ae992010-02-19 00:12:54 +0000168 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000169 server.start()
170
Dan Shicf2e8dd2015-05-07 17:18:48 -0700171 # Start the thread to report metadata.
172 metadata_reporter.start()
173
Paul Hobbseedcb8b2016-10-05 16:44:27 -0700174 with ts_mon_config.SetupTsMonGlobalState('autotest_scheduler',
175 indirect=True):
176 try:
177 initialize()
178 dispatcher = Dispatcher()
179 dispatcher.initialize(recover_hosts=options.recover_hosts)
180 minimum_tick_sec = global_config.global_config.get_config_value(
181 scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
Richard Barnetteffed1722016-05-18 15:57:22 -0700182
Paul Hobbseedcb8b2016-10-05 16:44:27 -0700183 while not _shutdown and not server._shutdown_scheduler:
184 start = time.time()
185 dispatcher.tick()
186 curr_tick_sec = time.time() - start
187 if minimum_tick_sec > curr_tick_sec:
188 time.sleep(minimum_tick_sec - curr_tick_sec)
189 else:
190 time.sleep(0.0001)
191 except server_manager_utils.ServerActionError as e:
192 # This error is expected when the server is not in primary status
193 # for scheduler role. Thus do not send email for it.
194 logging.exception(e)
195 except Exception:
196 email_manager.manager.log_stacktrace(
197 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000198
Paul Hobbsabd3b052016-10-03 18:25:23 +0000199 metadata_reporter.abort()
200 email_manager.manager.send_queued_emails()
201 server.shutdown()
202 _drone_manager.shutdown()
203 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000204
205
Prashanth B4ec98672014-05-15 10:44:54 -0700206def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000207 global _shutdown
208 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000209 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000210
211
jamesrenc44ae992010-02-19 00:12:54 +0000212def initialize():
showardb18134f2009-03-20 20:52:18 +0000213 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
214 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000215
showard8de37132009-08-31 18:33:08 +0000216 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000217 logging.critical("monitor_db already running, aborting!")
218 sys.exit(1)
219 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000220
showardb1e51872008-10-07 11:08:18 +0000221 if _testing_mode:
222 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700223 scheduler_lib.DB_CONFIG_SECTION, 'database',
224 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000225
Dan Shib9144a42014-12-01 16:09:32 -0800226 # If server database is enabled, check if the server has role `scheduler`.
227 # If the server does not have scheduler role, exception will be raised and
228 # scheduler will not continue to run.
229 if server_manager_utils.use_server_db():
230 server_manager_utils.confirm_server_has_role(hostname='localhost',
231 role='scheduler')
232
jadmanski0afbb632008-06-06 21:10:57 +0000233 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700234 global _db_manager
235 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700236 global _db
237 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000238 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700239 signal.signal(signal.SIGINT, handle_signal)
240 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000241
jamesrenc44ae992010-02-19 00:12:54 +0000242 initialize_globals()
243 scheduler_models.initialize()
244
Dan Shi114e1722016-01-10 18:12:53 -0800245 drone_list = system_utils.get_drones()
showard170873e2009-01-07 00:22:26 +0000246 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000247 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000248 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
249
showardb18134f2009-03-20 20:52:18 +0000250 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000251
252
jamesrenc44ae992010-02-19 00:12:54 +0000253def initialize_globals():
254 global _drone_manager
255 _drone_manager = drone_manager.instance()
256
257
showarded2afea2009-07-07 20:54:07 +0000258def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
259 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000260 """
261 @returns The autoserv command line as a list of executable + parameters.
262
263 @param machines - string - A machine or comma separated list of machines
264 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000265 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700266 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
267 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000268 @param queue_entry - A HostQueueEntry object - If supplied and no Job
269 object was supplied, this will be used to lookup the Job object.
270 """
Simran Basi1bf60eb2015-12-01 16:39:29 -0800271 command = autoserv_utils.autoserv_run_job_command(_autoserv_directory,
Aviv Keshet308e7362013-05-21 14:43:16 -0700272 machines, results_directory=drone_manager.WORKING_DIRECTORY,
273 extra_args=extra_args, job=job, queue_entry=queue_entry,
Simran Basi1bf60eb2015-12-01 16:39:29 -0800274 verbose=verbose, in_lab=True)
275 return command
showard87ba02a2009-04-20 19:37:32 +0000276
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800277def _calls_log_tick_msg(func):
278 """Used to trace functions called by BaseDispatcher.tick."""
279 @functools.wraps(func)
280 def wrapper(self, *args, **kwargs):
281 self._log_tick_msg('Starting %s' % func.__name__)
282 return func(self, *args, **kwargs)
283
284 return wrapper
285
showard87ba02a2009-04-20 19:37:32 +0000286
Simran Basia858a232012-08-21 11:04:37 -0700287class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800288
289
jadmanski0afbb632008-06-06 21:10:57 +0000290 def __init__(self):
291 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000292 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700293 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000294 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700295 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700296 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Dan Shic458f662015-04-29 12:12:38 -0700297 _db, _drone_manager)
showard170873e2009-01-07 00:22:26 +0000298 self._host_agents = {}
299 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000300 self._tick_count = 0
301 self._last_garbage_stats_time = time.time()
302 self._seconds_between_garbage_stats = 60 * (
303 global_config.global_config.get_config_value(
304 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700305 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700306 self._tick_debug = global_config.global_config.get_config_value(
307 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
308 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700309 self._extra_debugging = global_config.global_config.get_config_value(
310 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
311 default=False)
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800312 self._inline_host_acquisition = (
313 global_config.global_config.get_config_value(
314 scheduler_config.CONFIG_SECTION,
315 'inline_host_acquisition', type=bool, default=True))
mbligh36768f02008-02-22 18:28:33 +0000316
Prashanth Bf66d51b2014-05-06 12:42:25 -0700317 # If _inline_host_acquisition is set the scheduler will acquire and
318 # release hosts against jobs inline, with the tick. Otherwise the
319 # scheduler will only focus on jobs that already have hosts, and
320 # will not explicitly unlease a host when a job finishes using it.
321 self._job_query_manager = query_managers.AFEJobQueryManager()
322 self._host_scheduler = (host_scheduler.BaseHostScheduler()
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800323 if self._inline_host_acquisition else
Prashanth Bf66d51b2014-05-06 12:42:25 -0700324 host_scheduler.DummyHostScheduler())
325
mbligh36768f02008-02-22 18:28:33 +0000326
showard915958d2009-04-22 21:00:58 +0000327 def initialize(self, recover_hosts=True):
328 self._periodic_cleanup.initialize()
329 self._24hr_upkeep.initialize()
Dan Shi55d58992015-05-05 09:10:02 -0700330 # Execute all actions queued in the cleanup tasks. Scheduler tick will
331 # run a refresh task first. If there is any action in the queue, refresh
332 # will raise an exception.
333 _drone_manager.execute_actions()
showard915958d2009-04-22 21:00:58 +0000334
jadmanski0afbb632008-06-06 21:10:57 +0000335 # always recover processes
336 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000337
jadmanski0afbb632008-06-06 21:10:57 +0000338 if recover_hosts:
339 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000340
341
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800342 # TODO(pprabhu) Drop this metric once tick_times has been verified.
Prathmesh Prabhuc95a66a2016-11-22 19:10:56 -0800343 @metrics.SecondsTimerDecorator(
344 'chromeos/autotest/scheduler/tick_durations/tick')
jadmanski0afbb632008-06-06 21:10:57 +0000345 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700346 """
347 This is an altered version of tick() where we keep track of when each
348 major step begins so we can try to figure out where we are using most
349 of the tick time.
350 """
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800351 with metrics.RuntimeBreakdownTimer(
352 'chromeos/autotest/scheduler/tick_times') as breakdown_timer:
353 self._log_tick_msg('New tick')
354 system_utils.DroneCache.refresh()
355
356 with breakdown_timer.Step('garbage_collection'):
357 self._garbage_collection()
358 with breakdown_timer.Step('trigger_refresh'):
359 self._log_tick_msg('Starting _drone_manager.trigger_refresh')
360 _drone_manager.trigger_refresh()
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800361 with breakdown_timer.Step('schedule_delay_tasks'):
362 self._schedule_delay_tasks()
363 with breakdown_timer.Step('schedule_running_host_queue_entries'):
364 self._schedule_running_host_queue_entries()
365 with breakdown_timer.Step('schedule_special_tasks'):
366 self._schedule_special_tasks()
367 with breakdown_timer.Step('schedule_new_jobs'):
368 self._schedule_new_jobs()
369 with breakdown_timer.Step('sync_refresh'):
370 self._log_tick_msg('Starting _drone_manager.sync_refresh')
371 _drone_manager.sync_refresh()
372 # _run_cleanup must be called between drone_manager.sync_refresh,
373 # and drone_manager.execute_actions, as sync_refresh will clear the
374 # calls queued in drones. Therefore, any action that calls
375 # drone.queue_call to add calls to the drone._calls, should be after
376 # drone refresh is completed and before
377 # drone_manager.execute_actions at the end of the tick.
378 with breakdown_timer.Step('run_cleanup'):
379 self._run_cleanup()
380 with breakdown_timer.Step('find_aborting'):
381 self._find_aborting()
382 with breakdown_timer.Step('find_aborted_special_tasks'):
383 self._find_aborted_special_tasks()
384 with breakdown_timer.Step('handle_agents'):
385 self._handle_agents()
386 with breakdown_timer.Step('host_scheduler_tick'):
387 self._log_tick_msg('Starting _host_scheduler.tick')
388 self._host_scheduler.tick()
389 with breakdown_timer.Step('drones_execute_actions'):
390 self._log_tick_msg('Starting _drone_manager.execute_actions')
391 _drone_manager.execute_actions()
392 with breakdown_timer.Step('send_queued_emails'):
393 self._log_tick_msg(
394 'Starting email_manager.manager.send_queued_emails')
395 email_manager.manager.send_queued_emails()
396 with breakdown_timer.Step('db_reset_queries'):
397 self._log_tick_msg('Starting django.db.reset_queries')
398 django.db.reset_queries()
399
400 self._tick_count += 1
401 metrics.Counter('chromeos/autotest/scheduler/tick').increment()
mbligh36768f02008-02-22 18:28:33 +0000402
showard97aed502008-11-04 02:01:24 +0000403
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800404 @_calls_log_tick_msg
mblighf3294cc2009-04-08 21:17:38 +0000405 def _run_cleanup(self):
406 self._periodic_cleanup.run_cleanup_maybe()
407 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000408
mbligh36768f02008-02-22 18:28:33 +0000409
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800410 @_calls_log_tick_msg
showardf13a9e22009-12-18 22:54:09 +0000411 def _garbage_collection(self):
412 threshold_time = time.time() - self._seconds_between_garbage_stats
413 if threshold_time < self._last_garbage_stats_time:
414 # Don't generate these reports very often.
415 return
416
417 self._last_garbage_stats_time = time.time()
418 # Force a full level 0 collection (because we can, it doesn't hurt
419 # at this interval).
420 gc.collect()
421 logging.info('Logging garbage collector stats on tick %d.',
422 self._tick_count)
423 gc_stats._log_garbage_collector_stats()
424
425
showard170873e2009-01-07 00:22:26 +0000426 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
427 for object_id in object_ids:
428 agent_dict.setdefault(object_id, set()).add(agent)
429
430
431 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
432 for object_id in object_ids:
433 assert object_id in agent_dict
434 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700435 # If an ID has no more active agent associated, there is no need to
436 # keep it in the dictionary. Otherwise, scheduler will keep an
437 # unnecessarily big dictionary until being restarted.
438 if not agent_dict[object_id]:
439 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000440
441
showardd1195652009-12-08 22:21:02 +0000442 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700443 """
444 Creates and adds an agent to the dispatchers list.
445
446 In creating the agent we also pass on all the queue_entry_ids and
447 host_ids from the special agent task. For every agent we create, we
448 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
449 against the host_ids given to it. So theoritically, a host can have any
450 number of agents associated with it, and each of them can have any
451 special agent task, though in practice we never see > 1 agent/task per
452 host at any time.
453
454 @param agent_task: A SpecialTask for the agent to manage.
455 """
showardd1195652009-12-08 22:21:02 +0000456 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000457 self._agents.append(agent)
458 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000459 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
460 self._register_agent_for_ids(self._queue_entry_agents,
461 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000462
showard170873e2009-01-07 00:22:26 +0000463
464 def get_agents_for_entry(self, queue_entry):
465 """
466 Find agents corresponding to the specified queue_entry.
467 """
showardd3dc1992009-04-22 21:01:40 +0000468 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000469
470
471 def host_has_agent(self, host):
472 """
473 Determine if there is currently an Agent present using this host.
474 """
475 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000476
477
jadmanski0afbb632008-06-06 21:10:57 +0000478 def remove_agent(self, agent):
479 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000480 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
481 agent)
482 self._unregister_agent_for_ids(self._queue_entry_agents,
483 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000484
485
showard8cc058f2009-09-08 16:26:33 +0000486 def _host_has_scheduled_special_task(self, host):
487 return bool(models.SpecialTask.objects.filter(host__id=host.id,
488 is_active=False,
489 is_complete=False))
490
491
jadmanski0afbb632008-06-06 21:10:57 +0000492 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000493 agent_tasks = self._create_recovery_agent_tasks()
494 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000495 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000496 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000497 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000498 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000499 self._reverify_remaining_hosts()
500 # reinitialize drones after killing orphaned processes, since they can
501 # leave around files when they die
502 _drone_manager.execute_actions()
503 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000504
showard170873e2009-01-07 00:22:26 +0000505
showardd1195652009-12-08 22:21:02 +0000506 def _create_recovery_agent_tasks(self):
507 return (self._get_queue_entry_agent_tasks()
508 + self._get_special_task_agent_tasks(is_active=True))
509
510
511 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700512 """
513 Get agent tasks for all hqe in the specified states.
514
515 Loosely this translates to taking a hqe in one of the specified states,
516 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
517 through _get_agent_task_for_queue_entry. Each queue entry can only have
518 one agent task at a time, but there might be multiple queue entries in
519 the group.
520
521 @return: A list of AgentTasks.
522 """
showardd1195652009-12-08 22:21:02 +0000523 # host queue entry statuses handled directly by AgentTasks (Verifying is
524 # handled through SpecialTasks, so is not listed here)
525 statuses = (models.HostQueueEntry.Status.STARTING,
526 models.HostQueueEntry.Status.RUNNING,
527 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000528 models.HostQueueEntry.Status.PARSING,
529 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000530 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000531 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000532 where='status IN (%s)' % status_list)
533
534 agent_tasks = []
535 used_queue_entries = set()
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800536 hqe_count_by_status = {}
showardd1195652009-12-08 22:21:02 +0000537 for entry in queue_entries:
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800538 hqe_count_by_status[entry.status] = (
539 hqe_count_by_status.get(entry.status, 0) + 1)
showardd1195652009-12-08 22:21:02 +0000540 if self.get_agents_for_entry(entry):
541 # already being handled
542 continue
543 if entry in used_queue_entries:
544 # already picked up by a synchronous job
545 continue
546 agent_task = self._get_agent_task_for_queue_entry(entry)
547 agent_tasks.append(agent_task)
548 used_queue_entries.update(agent_task.queue_entries)
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800549
550 for status, count in hqe_count_by_status.iteritems():
551 metrics.Gauge(
552 'chromeos/autotest/scheduler/active_host_queue_entries'
553 ).set(count, fields={'status': status})
554
showardd1195652009-12-08 22:21:02 +0000555 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000556
557
showardd1195652009-12-08 22:21:02 +0000558 def _get_special_task_agent_tasks(self, is_active=False):
559 special_tasks = models.SpecialTask.objects.filter(
560 is_active=is_active, is_complete=False)
561 return [self._get_agent_task_for_special_task(task)
562 for task in special_tasks]
563
564
565 def _get_agent_task_for_queue_entry(self, queue_entry):
566 """
beeps8bb1f7d2013-08-05 01:30:09 -0700567 Construct an AgentTask instance for the given active HostQueueEntry.
568
showardd1195652009-12-08 22:21:02 +0000569 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700570 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000571 """
572 task_entries = queue_entry.job.get_group_entries(queue_entry)
573 self._check_for_duplicate_host_entries(task_entries)
574
575 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
576 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000577 if queue_entry.is_hostless():
578 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000579 return QueueTask(queue_entries=task_entries)
580 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700581 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000582 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700583 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000584 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700585 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000586
Prashanth B0e960282014-05-13 19:38:28 -0700587 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800588 '_get_agent_task_for_queue_entry got entry with '
589 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000590
591
592 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000593 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
594 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000595 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000596 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000597 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000598 if using_host:
showardd1195652009-12-08 22:21:02 +0000599 self._assert_host_has_no_agent(task_entry)
600
601
602 def _assert_host_has_no_agent(self, entry):
603 """
604 @param entry: a HostQueueEntry or a SpecialTask
605 """
606 if self.host_has_agent(entry.host):
607 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700608 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000609 'While scheduling %s, host %s already has a host agent %s'
610 % (entry, entry.host, agent.task))
611
612
613 def _get_agent_task_for_special_task(self, special_task):
614 """
615 Construct an AgentTask class to run the given SpecialTask and add it
616 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700617
MK Ryu35d661e2014-09-25 17:44:10 -0700618 A special task is created through schedule_special_tasks, but only if
beeps8bb1f7d2013-08-05 01:30:09 -0700619 the host doesn't already have an agent. This happens through
620 add_agent_task. All special agent tasks are given a host on creation,
621 and a Null hqe. To create a SpecialAgentTask object, you need a
622 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
623 object contains a hqe it's passed on to the special agent task, which
624 creates a HostQueueEntry and saves it as it's queue_entry.
625
showardd1195652009-12-08 22:21:02 +0000626 @param special_task: a models.SpecialTask instance
627 @returns an AgentTask to run this SpecialTask
628 """
629 self._assert_host_has_no_agent(special_task)
630
beeps5e2bb4a2013-10-28 11:26:45 -0700631 special_agent_task_classes = (prejob_task.CleanupTask,
632 prejob_task.VerifyTask,
633 prejob_task.RepairTask,
634 prejob_task.ResetTask,
635 prejob_task.ProvisionTask)
636
showardd1195652009-12-08 22:21:02 +0000637 for agent_task_class in special_agent_task_classes:
638 if agent_task_class.TASK_TYPE == special_task.task:
639 return agent_task_class(task=special_task)
640
Prashanth B0e960282014-05-13 19:38:28 -0700641 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800642 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000643
644
645 def _register_pidfiles(self, agent_tasks):
646 for agent_task in agent_tasks:
647 agent_task.register_necessary_pidfiles()
648
649
650 def _recover_tasks(self, agent_tasks):
651 orphans = _drone_manager.get_orphaned_autoserv_processes()
652
653 for agent_task in agent_tasks:
654 agent_task.recover()
655 if agent_task.monitor and agent_task.monitor.has_process():
656 orphans.discard(agent_task.monitor.get_process())
657 self.add_agent_task(agent_task)
658
659 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000660
661
showard8cc058f2009-09-08 16:26:33 +0000662 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000663 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
664 % status):
showard0db3d432009-10-12 20:29:15 +0000665 if entry.status == status and not self.get_agents_for_entry(entry):
666 # The status can change during iteration, e.g., if job.run()
667 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000668 yield entry
669
670
showard6878e8b2009-07-20 22:37:45 +0000671 def _check_for_remaining_orphan_processes(self, orphans):
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800672 m = 'chromeos/autotest/errors/unrecovered_orphan_processes'
673 metrics.Gauge(m).set(len(orphans))
674
showard6878e8b2009-07-20 22:37:45 +0000675 if not orphans:
676 return
677 subject = 'Unrecovered orphan autoserv processes remain'
678 message = '\n'.join(str(process) for process in orphans)
mbligh5fa9e112009-08-03 16:46:06 +0000679 die_on_orphans = global_config.global_config.get_config_value(
680 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
681
682 if die_on_orphans:
683 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000684
showard170873e2009-01-07 00:22:26 +0000685
showard8cc058f2009-09-08 16:26:33 +0000686 def _recover_pending_entries(self):
687 for entry in self._get_unassigned_entries(
688 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000689 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000690 entry.on_pending()
691
692
showardb8900452009-10-12 20:31:01 +0000693 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000694 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000695 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
696 unrecovered_hqes = []
697 for queue_entry in queue_entries:
698 special_tasks = models.SpecialTask.objects.filter(
699 task__in=(models.SpecialTask.Task.CLEANUP,
700 models.SpecialTask.Task.VERIFY),
701 queue_entry__id=queue_entry.id,
702 is_complete=False)
703 if special_tasks.count() == 0:
704 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000705
showardb8900452009-10-12 20:31:01 +0000706 if unrecovered_hqes:
707 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700708 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000709 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000710 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000711
712
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800713 @_calls_log_tick_msg
showard65db3932009-10-28 19:54:35 +0000714 def _schedule_special_tasks(self):
715 """
716 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700717
718 Special tasks include PreJobTasks like verify, reset and cleanup.
719 They are created through _schedule_new_jobs and associated with a hqe
720 This method translates SpecialTasks to the appropriate AgentTask and
721 adds them to the dispatchers agents list, so _handle_agents can execute
722 them.
showard65db3932009-10-28 19:54:35 +0000723 """
Prashanth B4ec98672014-05-15 10:44:54 -0700724 # When the host scheduler is responsible for acquisition we only want
725 # to run tasks with leased hosts. All hqe tasks will already have
726 # leased hosts, and we don't want to run frontend tasks till the host
727 # scheduler has vetted the assignment. Note that this doesn't include
728 # frontend tasks with hosts leased by other active hqes.
729 for task in self._job_query_manager.get_prioritized_special_tasks(
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800730 only_tasks_with_leased_hosts=not self._inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000731 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000732 continue
showardd1195652009-12-08 22:21:02 +0000733 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000734
735
showard170873e2009-01-07 00:22:26 +0000736 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000737 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000738 # should never happen
showarded2afea2009-07-07 20:54:07 +0000739 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000740 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000741 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700742 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000743 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000744
745
jadmanski0afbb632008-06-06 21:10:57 +0000746 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000747 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700748 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000749 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000750 if self.host_has_agent(host):
751 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000752 continue
showard8cc058f2009-09-08 16:26:33 +0000753 if self._host_has_scheduled_special_task(host):
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700754 # host will have a special task scheduled on the next tick
showard8cc058f2009-09-08 16:26:33 +0000755 continue
showard170873e2009-01-07 00:22:26 +0000756 if print_message:
showardb18134f2009-03-20 20:52:18 +0000757 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000758 models.SpecialTask.objects.create(
759 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000760 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000761
762
jadmanski0afbb632008-06-06 21:10:57 +0000763 def _recover_hosts(self):
764 # recover "Repair Failed" hosts
765 message = 'Reverifying dead host %s'
766 self._reverify_hosts_where("status = 'Repair Failed'",
767 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000768
769
showard89f84db2009-03-12 20:39:13 +0000770 def _refresh_pending_queue_entries(self):
771 """
772 Lookup the pending HostQueueEntries and call our HostScheduler
773 refresh() method given that list. Return the list.
774
775 @returns A list of pending HostQueueEntries sorted in priority order.
776 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700777 queue_entries = self._job_query_manager.get_pending_queue_entries(
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800778 only_hostless=not self._inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000779 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000780 return []
showard89f84db2009-03-12 20:39:13 +0000781 return queue_entries
782
783
showarda9545c02009-12-18 22:44:26 +0000784 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800785 """Schedule a hostless (suite) job.
786
787 @param queue_entry: The queue_entry representing the hostless job.
788 """
showarda9545c02009-12-18 22:44:26 +0000789 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700790
791 # Need to set execution_subdir before setting the status:
792 # After a restart of the scheduler, agents will be restored for HQEs in
793 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
794 # execution_subdir is needed. Therefore it must be set before entering
795 # one of these states.
796 # Otherwise, if the scheduler was interrupted between setting the status
797 # and the execution_subdir, upon it's restart restoring agents would
798 # fail.
799 # Is there a way to get a status in one of these states without going
800 # through this code? Following cases are possible:
801 # - If it's aborted before being started:
802 # active bit will be 0, so there's nothing to parse, it will just be
803 # set to completed by _find_aborting. Critical statuses are skipped.
804 # - If it's aborted or it fails after being started:
805 # It was started, so this code was executed.
806 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000807 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000808
809
beepscc9fc702013-12-02 12:45:38 -0800810 def _schedule_host_job(self, host, queue_entry):
811 """Schedules a job on the given host.
812
813 1. Assign the host to the hqe, if it isn't already assigned.
814 2. Create a SpecialAgentTask for the hqe.
815 3. Activate the hqe.
816
817 @param queue_entry: The job to schedule.
818 @param host: The host to schedule the job on.
819 """
820 if self.host_has_agent(host):
821 host_agent_task = list(self._host_agents.get(host.id))[0].task
beepscc9fc702013-12-02 12:45:38 -0800822 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700823 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800824
825
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800826 @_calls_log_tick_msg
showard89f84db2009-03-12 20:39:13 +0000827 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700828 """
829 Find any new HQEs and call schedule_pre_job_tasks for it.
830
831 This involves setting the status of the HQE and creating a row in the
832 db corresponding the the special task, through
833 scheduler_models._queue_special_task. The new db row is then added as
834 an agent to the dispatcher through _schedule_special_tasks and
835 scheduled for execution on the drone through _handle_agents.
836 """
showard89f84db2009-03-12 20:39:13 +0000837 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000838
beepscc9fc702013-12-02 12:45:38 -0800839 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700840 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700841 new_jobs_with_hosts = 0
842 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800843 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700844 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000845
beepscc9fc702013-12-02 12:45:38 -0800846 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000847 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000848 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700849 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000850 else:
beepscc9fc702013-12-02 12:45:38 -0800851 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700852 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700853
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800854 metrics.Counter(
855 'chromeos/autotest/scheduler/scheduled_jobs_hostless'
856 ).increment_by(new_hostless_jobs)
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800857
beepscc9fc702013-12-02 12:45:38 -0800858 if not host_jobs:
859 return
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800860
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -0800861 if not self._inline_host_acquisition:
Prathmesh Prabhu3c6a3bf2016-12-20 11:29:02 -0800862 # In this case, host_scheduler is responsible for scheduling
863 # host_jobs. Scheduling the jobs ourselves can lead to DB corruption
864 # since host_scheduler assumes it is the single process scheduling
865 # host jobs.
866 metrics.Gauge(
867 'chromeos/autotest/errors/scheduler/unexpected_host_jobs').set(
868 len(host_jobs))
869 return
870
Prashanth Bf66d51b2014-05-06 12:42:25 -0700871 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
872 for host_assignment in jobs_with_hosts:
873 self._schedule_host_job(host_assignment.host, host_assignment.job)
874 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800875
Prathmesh Prabhued7ece92016-11-23 11:19:43 -0800876 metrics.Counter(
877 'chromeos/autotest/scheduler/scheduled_jobs_with_hosts'
878 ).increment_by(new_jobs_with_hosts)
879 # TODO(pprabhu): Decide what to do about this metric. Million dollar
880 # question: What happens to jobs that were not matched. Do they stay in
881 # the queue, and get processed right here in the next tick (then we want
882 # a guage corresponding to the number of outstanding unmatched host
883 # jobs), or are they handled somewhere else (then we need a counter
884 # corresponding to failed_to_match_with_hosts jobs).
885 #autotest_stats.Gauge(key).send('new_jobs_without_hosts',
886 # new_jobs_need_hosts -
887 # new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000888
889
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800890 @_calls_log_tick_msg
showard8cc058f2009-09-08 16:26:33 +0000891 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700892 """
893 Adds agents to the dispatcher.
894
895 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
896 QueueTask for example, will have a job with a control file, and
897 the agent will have methods that poll, abort and check if the queue
898 task is finished. The dispatcher runs the agent_task, as well as
899 other agents in it's _agents member, through _handle_agents, by
900 calling the Agents tick().
901
902 This method creates an agent for each HQE in one of (starting, running,
903 gathering, parsing, archiving) states, and adds it to the dispatcher so
904 it is handled by _handle_agents.
905 """
showardd1195652009-12-08 22:21:02 +0000906 for agent_task in self._get_queue_entry_agent_tasks():
907 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000908
909
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800910 @_calls_log_tick_msg
showard8cc058f2009-09-08 16:26:33 +0000911 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000912 for entry in scheduler_models.HostQueueEntry.fetch(
913 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000914 task = entry.job.schedule_delayed_callback_task(entry)
915 if task:
showardd1195652009-12-08 22:21:02 +0000916 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000917
918
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800919 @_calls_log_tick_msg
jadmanski0afbb632008-06-06 21:10:57 +0000920 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700921 """
922 Looks through the afe_host_queue_entries for an aborted entry.
923
924 The aborted bit is set on an HQE in many ways, the most common
925 being when a user requests an abort through the frontend, which
926 results in an rpc from the afe to abort_host_queue_entries.
927 """
jamesrene7c65cb2010-06-08 20:38:10 +0000928 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000929 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700930 where='aborted=1 and complete=0'):
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800931
932 # If the job is running on a shard, let the shard handle aborting
933 # it and sync back the right status.
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800934 if entry.job.shard_id is not None and not server_utils.is_shard():
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800935 logging.info('Waiting for shard %s to abort hqe %s',
936 entry.job.shard_id, entry)
937 continue
938
showardf4a2e502009-07-28 20:06:39 +0000939 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800940
941 # The task would have started off with both is_complete and
942 # is_active = False. Aborted tasks are neither active nor complete.
943 # For all currently active tasks this will happen through the agent,
944 # but we need to manually update the special tasks that haven't
945 # started yet, because they don't have agents.
946 models.SpecialTask.objects.filter(is_active=False,
947 queue_entry_id=entry.id).update(is_complete=True)
948
showardd3dc1992009-04-22 21:01:40 +0000949 for agent in self.get_agents_for_entry(entry):
950 agent.abort()
951 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000952 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700953 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000954 for job in jobs_to_stop:
955 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000956
957
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -0800958 @_calls_log_tick_msg
beeps8bb1f7d2013-08-05 01:30:09 -0700959 def _find_aborted_special_tasks(self):
960 """
961 Find SpecialTasks that have been marked for abortion.
962
963 Poll the database looking for SpecialTasks that are active
964 and have been marked for abortion, then abort them.
965 """
966
967 # The completed and active bits are very important when it comes
968 # to scheduler correctness. The active bit is set through the prolog
969 # of a special task, and reset through the cleanup method of the
970 # SpecialAgentTask. The cleanup is called both through the abort and
971 # epilog. The complete bit is set in several places, and in general
972 # a hanging job will have is_active=1 is_complete=0, while a special
973 # task which completed will have is_active=0 is_complete=1. To check
974 # aborts we directly check active because the complete bit is set in
975 # several places, including the epilog of agent tasks.
976 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
977 is_aborted=True)
978 for task in aborted_tasks:
979 # There are 2 ways to get the agent associated with a task,
980 # through the host and through the hqe. A special task
981 # always needs a host, but doesn't always need a hqe.
982 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700983 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000984
beeps8bb1f7d2013-08-05 01:30:09 -0700985 # The epilog preforms critical actions such as
986 # queueing the next SpecialTask, requeuing the
987 # hqe etc, however it doesn't actually kill the
988 # monitor process and set the 'done' bit. Epilogs
989 # assume that the job failed, and that the monitor
990 # process has already written an exit code. The
991 # done bit is a necessary condition for
992 # _handle_agents to schedule any more special
993 # tasks against the host, and it must be set
994 # in addition to is_active, is_complete and success.
995 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000996 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700997
998
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700999 def _can_start_agent(self, agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001000 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +00001001 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +00001002 return True
1003 # don't allow any nonzero-process agents to run after we've reached a
1004 # limit (this avoids starvation of many-process agents)
1005 if have_reached_limit:
1006 return False
1007 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001008 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +00001009 agent.task.owner_username,
1010 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +00001011 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001012 return False
showard4c5374f2008-09-04 17:02:56 +00001013 return True
1014
1015
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -08001016 @_calls_log_tick_msg
jadmanski0afbb632008-06-06 21:10:57 +00001017 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -07001018 """
1019 Handles agents of the dispatcher.
1020
1021 Appropriate Agents are added to the dispatcher through
1022 _schedule_running_host_queue_entries. These agents each
1023 have a task. This method runs the agents task through
1024 agent.tick() leading to:
1025 agent.start
1026 prolog -> AgentTasks prolog
1027 For each queue entry:
1028 sets host status/status to Running
1029 set started_on in afe_host_queue_entries
1030 run -> AgentTasks run
1031 Creates PidfileRunMonitor
1032 Queues the autoserv command line for this AgentTask
1033 via the drone manager. These commands are executed
1034 through the drone managers execute actions.
1035 poll -> AgentTasks/BaseAgentTask poll
1036 checks the monitors exit_code.
1037 Executes epilog if task is finished.
1038 Executes AgentTasks _finish_task
1039 finish_task is usually responsible for setting the status
1040 of the HQE/host, and updating it's active and complete fileds.
1041
1042 agent.is_done
1043 Removed the agent from the dispatchers _agents queue.
1044 Is_done checks the finished bit on the agent, that is
1045 set based on the Agents task. During the agents poll
1046 we check to see if the monitor process has exited in
1047 it's finish method, and set the success member of the
1048 task based on this exit code.
1049 """
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001050 num_started_this_tick = 0
1051 num_finished_this_tick = 0
showard4c5374f2008-09-04 17:02:56 +00001052 have_reached_limit = False
1053 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -07001054 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +00001055 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001056 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1057 'queue_entry ids:%s' % (agent.host_ids,
1058 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001059 if not agent.started:
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001060 if not self._can_start_agent(agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001061 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001062 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001063 continue
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001064 num_started_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001065 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001066 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001067 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001068 if agent.is_done():
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001069 num_finished_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001070 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001071 self.remove_agent(agent)
Prathmesh Prabhued7ece92016-11-23 11:19:43 -08001072
1073 metrics.Counter(
1074 'chromeos/autotest/scheduler/agent_processes_started'
1075 ).increment_by(num_started_this_tick)
1076 metrics.Counter(
1077 'chromeos/autotest/scheduler/agent_processes_finished'
1078 ).increment_by(num_finished_this_tick)
1079 num_agent_processes = _drone_manager.total_running_processes()
1080 metrics.Gauge(
1081 'chromeos/autotest/scheduler/agent_processes'
1082 ).set(num_agent_processes)
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001083 logging.info('%d running processes. %d added this tick.',
Prathmesh Prabhued7ece92016-11-23 11:19:43 -08001084 num_agent_processes, num_started_this_tick)
mbligh36768f02008-02-22 18:28:33 +00001085
1086
Prathmesh Prabhu7ab1ef62016-12-15 13:43:22 -08001087 def _log_tick_msg(self, msg):
1088 if self._tick_debug:
1089 logging.debug(msg)
1090
1091
1092 def _log_extra_msg(self, msg):
1093 if self._extra_debugging:
1094 logging.debug(msg)
1095
1096
Simran Basia858a232012-08-21 11:04:37 -07001097SiteDispatcher = utils.import_site_class(
1098 __file__, 'autotest_lib.scheduler.site_monitor_db',
1099 'SiteDispatcher', BaseDispatcher)
1100
1101class Dispatcher(SiteDispatcher):
1102 pass
1103
1104
mbligh36768f02008-02-22 18:28:33 +00001105class Agent(object):
showard77182562009-06-10 00:16:05 +00001106 """
Alex Miller47715eb2013-07-24 03:34:01 -07001107 An agent for use by the Dispatcher class to perform a task. An agent wraps
1108 around an AgentTask mainly to associate the AgentTask with the queue_entry
1109 and host ids.
showard77182562009-06-10 00:16:05 +00001110
1111 The following methods are required on all task objects:
1112 poll() - Called periodically to let the task check its status and
1113 update its internal state. If the task succeeded.
1114 is_done() - Returns True if the task is finished.
1115 abort() - Called when an abort has been requested. The task must
1116 set its aborted attribute to True if it actually aborted.
1117
1118 The following attributes are required on all task objects:
1119 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001120 success - bool, True if this task succeeded.
1121 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1122 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001123 """
1124
1125
showard418785b2009-11-23 20:19:59 +00001126 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001127 """
Alex Miller47715eb2013-07-24 03:34:01 -07001128 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001129 """
showard8cc058f2009-09-08 16:26:33 +00001130 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001131
showard77182562009-06-10 00:16:05 +00001132 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001133 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001134
showard8cc058f2009-09-08 16:26:33 +00001135 self.queue_entry_ids = task.queue_entry_ids
1136 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001137
showard8cc058f2009-09-08 16:26:33 +00001138 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001139 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001140
1141
jadmanski0afbb632008-06-06 21:10:57 +00001142 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001143 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001144 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001145 self.task.poll()
1146 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001147 self.finished = True
showardec113162008-05-08 00:52:49 +00001148
1149
jadmanski0afbb632008-06-06 21:10:57 +00001150 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001151 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001152
1153
showardd3dc1992009-04-22 21:01:40 +00001154 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001155 if self.task:
1156 self.task.abort()
1157 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001158 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001159 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001160
showardd3dc1992009-04-22 21:01:40 +00001161
beeps5e2bb4a2013-10-28 11:26:45 -07001162class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001163 """
1164 Common functionality for QueueTask and HostlessQueueTask
1165 """
1166 def __init__(self, queue_entries):
1167 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001168 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001169 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001170
1171
showard73ec0442009-02-07 02:05:20 +00001172 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001173 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001174
1175
jamesrenc44ae992010-02-19 00:12:54 +00001176 def _write_control_file(self, execution_path):
1177 control_path = _drone_manager.attach_file_to_execution(
1178 execution_path, self.job.control_file)
1179 return control_path
1180
1181
Aviv Keshet308e7362013-05-21 14:43:16 -07001182 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001183 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001184 execution_path = self.queue_entries[0].execution_path()
1185 control_path = self._write_control_file(execution_path)
1186 hostnames = ','.join(entry.host.hostname
1187 for entry in self.queue_entries
1188 if not entry.is_hostless())
1189
1190 execution_tag = self.queue_entries[0].execution_tag()
1191 params = _autoserv_command_line(
1192 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001193 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001194 _drone_manager.absolute_path(control_path)],
1195 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001196 if self.job.is_image_update_job():
1197 params += ['--image', self.job.update_image_path]
1198
jamesrenc44ae992010-02-19 00:12:54 +00001199 return params
showardd1195652009-12-08 22:21:02 +00001200
1201
1202 @property
1203 def num_processes(self):
1204 return len(self.queue_entries)
1205
1206
1207 @property
1208 def owner_username(self):
1209 return self.job.owner
1210
1211
1212 def _working_directory(self):
1213 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001214
1215
jadmanski0afbb632008-06-06 21:10:57 +00001216 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001217 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001218 keyval_dict = self.job.keyval_dict()
1219 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001220 group_name = self.queue_entries[0].get_group_name()
1221 if group_name:
1222 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001223 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001224 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001225 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001226 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001227
1228
showard35162b02009-03-03 02:17:30 +00001229 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001230 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001231 _drone_manager.write_lines_to_file(error_file_path,
1232 [_LOST_PROCESS_ERROR])
1233
1234
showardd3dc1992009-04-22 21:01:40 +00001235 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001236 if not self.monitor:
1237 return
1238
showardd9205182009-04-27 20:09:55 +00001239 self._write_job_finished()
1240
showard35162b02009-03-03 02:17:30 +00001241 if self.monitor.lost_process:
1242 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001243
jadmanskif7fa2cc2008-10-01 14:13:23 +00001244
showardcbd74612008-11-19 21:42:02 +00001245 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001246 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001247 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001248 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001249 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001250
1251
jadmanskif7fa2cc2008-10-01 14:13:23 +00001252 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001253 if not self.monitor or not self.monitor.has_process():
1254 return
1255
jadmanskif7fa2cc2008-10-01 14:13:23 +00001256 # build up sets of all the aborted_by and aborted_on values
1257 aborted_by, aborted_on = set(), set()
1258 for queue_entry in self.queue_entries:
1259 if queue_entry.aborted_by:
1260 aborted_by.add(queue_entry.aborted_by)
1261 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1262 aborted_on.add(t)
1263
1264 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001265 # TODO(showard): this conditional is now obsolete, we just need to leave
1266 # it in temporarily for backwards compatibility over upgrades. delete
1267 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001268 assert len(aborted_by) <= 1
1269 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001270 aborted_by_value = aborted_by.pop()
1271 aborted_on_value = max(aborted_on)
1272 else:
1273 aborted_by_value = 'autotest_system'
1274 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001275
showarda0382352009-02-11 23:36:43 +00001276 self._write_keyval_after_job("aborted_by", aborted_by_value)
1277 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001278
showardcbd74612008-11-19 21:42:02 +00001279 aborted_on_string = str(datetime.datetime.fromtimestamp(
1280 aborted_on_value))
1281 self._write_status_comment('Job aborted by %s on %s' %
1282 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001283
1284
jadmanski0afbb632008-06-06 21:10:57 +00001285 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001286 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001287 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001288 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001289
1290
jadmanski0afbb632008-06-06 21:10:57 +00001291 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001292 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001293 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001294
1295
1296class QueueTask(AbstractQueueTask):
1297 def __init__(self, queue_entries):
1298 super(QueueTask, self).__init__(queue_entries)
1299 self._set_ids(queue_entries=queue_entries)
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -08001300 self._enable_ssp_container = (
1301 global_config.global_config.get_config_value(
1302 'AUTOSERV', 'enable_ssp_container', type=bool,
1303 default=True))
showarda9545c02009-12-18 22:44:26 +00001304
1305
1306 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001307 self._check_queue_entry_statuses(
1308 self.queue_entries,
1309 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1310 models.HostQueueEntry.Status.RUNNING),
1311 allowed_host_statuses=(models.Host.Status.PENDING,
1312 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001313
1314 super(QueueTask, self).prolog()
1315
1316 for queue_entry in self.queue_entries:
1317 self._write_host_keyvals(queue_entry.host)
1318 queue_entry.host.set_status(models.Host.Status.RUNNING)
1319 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001320
1321
1322 def _finish_task(self):
1323 super(QueueTask, self)._finish_task()
1324
1325 for queue_entry in self.queue_entries:
1326 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001327 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001328
1329
Alex Miller9f01d5d2013-08-08 02:26:01 -07001330 def _command_line(self):
Dan Shi36cfd832014-10-10 13:38:51 -07001331 invocation = super(QueueTask, self)._command_line()
1332 # Check if server-side packaging is needed.
Prathmesh Prabhu468c32c2016-12-20 15:12:30 -08001333 if (self._enable_ssp_container and
Dan Shi36cfd832014-10-10 13:38:51 -07001334 self.job.control_type == control_data.CONTROL_TYPE.SERVER and
1335 self.job.require_ssp != False):
Dan Shiec1d47d2015-02-13 11:38:13 -08001336 invocation += ['--require-ssp']
Dan Shi36cfd832014-10-10 13:38:51 -07001337 keyval_dict = self.job.keyval_dict()
1338 test_source_build = keyval_dict.get('test_source_build', None)
1339 if test_source_build:
1340 invocation += ['--test_source_build', test_source_build]
Dan Shi70647ca2015-07-16 22:52:35 -07001341 if self.job.parent_job_id:
1342 invocation += ['--parent_job_id', str(self.job.parent_job_id)]
Dan Shi36cfd832014-10-10 13:38:51 -07001343 return invocation + ['--verify_job_repo_url']
Alex Miller9f01d5d2013-08-08 02:26:01 -07001344
1345
Dan Shi1a189052013-10-28 14:41:35 -07001346class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001347 def __init__(self, queue_entry):
1348 super(HostlessQueueTask, self).__init__([queue_entry])
1349 self.queue_entry_ids = [queue_entry.id]
1350
1351
1352 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001353 super(HostlessQueueTask, self).prolog()
1354
1355
mbligh4608b002010-01-05 18:22:35 +00001356 def _finish_task(self):
1357 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001358
1359 # When a job is added to database, its initial status is always
1360 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1361 # status, check if any of them can be started. If scheduler hits some
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001362 # limit, e.g., max_hostless_jobs_per_drone, scheduler will
1363 # leave these jobs in Starting status. Otherwise, the jobs'
1364 # status will be changed to Running, and an autoserv process
1365 # will be started in drone for each of these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001366 # If the entry is still in status Starting, the process has not started
1367 # yet. Therefore, there is no need to parse and collect log. Without
1368 # this check, exception will be raised by scheduler as execution_subdir
1369 # for this queue entry does not have a value yet.
1370 hqe = self.queue_entries[0]
1371 if hqe.status != models.HostQueueEntry.Status.STARTING:
1372 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001373
1374
mbligh36768f02008-02-22 18:28:33 +00001375if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001376 main()