blob: 8709b729b0da18a4764ea423cef6043fa683da46 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
Richard Barnetteffed1722016-05-18 15:57:22 -07002
3#pylint: disable=C0111
4
mbligh36768f02008-02-22 18:28:33 +00005"""
6Autotest scheduler
7"""
showard909c7a62008-07-15 21:52:38 +00008
Dan Shif6c65bd2014-08-29 16:15:07 -07009import datetime
10import gc
11import logging
12import optparse
13import os
14import signal
15import sys
16import time
showard402934a2009-12-21 22:20:47 +000017
Alex Miller05d7b4c2013-03-04 07:49:38 -080018import common
showard21baa452008-10-21 00:08:39 +000019from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000020
21import django.db
Aviv Keshet65fed072016-06-29 10:20:55 -070022from chromite.lib import metrics
Richard Barnetteffed1722016-05-18 15:57:22 -070023from chromite.lib import ts_mon_config
showard402934a2009-12-21 22:20:47 +000024
Dan Shiec1d47d2015-02-13 11:38:13 -080025from autotest_lib.client.common_lib import control_data
Prashanth B0e960282014-05-13 19:38:28 -070026from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070027from autotest_lib.client.common_lib import utils
Gabe Black1e1c41b2015-02-04 23:55:15 -080028from autotest_lib.client.common_lib.cros.graphite import autotest_stats
Prashanth B0e960282014-05-13 19:38:28 -070029from autotest_lib.frontend.afe import models, rpc_utils
Fang Dengc330bee2014-10-21 18:10:55 -070030from autotest_lib.scheduler import agent_task, drone_manager
beeps5e2bb4a2013-10-28 11:26:45 -070031from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
32from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070033from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070034from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070035from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000036from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080037from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070038from autotest_lib.server import autoserv_utils
Dan Shi114e1722016-01-10 18:12:53 -080039from autotest_lib.server import system_utils
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080040from autotest_lib.server import utils as server_utils
Dan Shicf2e8dd2015-05-07 17:18:48 -070041from autotest_lib.site_utils import metadata_reporter
Dan Shib9144a42014-12-01 16:09:32 -080042from autotest_lib.site_utils import server_manager_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080043
Dan Shicf2e8dd2015-05-07 17:18:48 -070044
showard549afad2009-08-20 23:33:36 +000045BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
46PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000047
mbligh36768f02008-02-22 18:28:33 +000048RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000049AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
50
51if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000052 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000053AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
54AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
55
56if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000057 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000058
showard35162b02009-03-03 02:17:30 +000059# error message to leave in results dir when an autoserv process disappears
60# mysteriously
61_LOST_PROCESS_ERROR = """\
62Autoserv failed abnormally during execution for this job, probably due to a
63system error on the Autotest server. Full results may not be available. Sorry.
64"""
65
Prashanth B0e960282014-05-13 19:38:28 -070066_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070067_db = None
mbligh36768f02008-02-22 18:28:33 +000068_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070069
70# These 2 globals are replaced for testing
71_autoserv_directory = autoserv_utils.autoserv_directory
72_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000073_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000074_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070075_inline_host_acquisition = global_config.global_config.get_config_value(
76 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
77 default=True)
78
Dan Shiec1d47d2015-02-13 11:38:13 -080079_enable_ssp_container = global_config.global_config.get_config_value(
80 'AUTOSERV', 'enable_ssp_container', type=bool,
81 default=True)
mbligh36768f02008-02-22 18:28:33 +000082
mbligh83c1e9e2009-05-01 23:10:41 +000083def _site_init_monitor_db_dummy():
84 return {}
85
86
jamesren76fcf192010-04-21 20:39:50 +000087def _verify_default_drone_set_exists():
88 if (models.DroneSet.drone_sets_enabled() and
89 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070090 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080091 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000092
93
94def _sanity_check():
95 """Make sure the configs are consistent before starting the scheduler"""
96 _verify_default_drone_set_exists()
97
98
mbligh36768f02008-02-22 18:28:33 +000099def main():
showard27f33872009-04-07 18:20:53 +0000100 try:
showard549afad2009-08-20 23:33:36 +0000101 try:
102 main_without_exception_handling()
103 except SystemExit:
104 raise
105 except:
106 logging.exception('Exception escaping in monitor_db')
107 raise
108 finally:
109 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000110
111
112def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700113 scheduler_lib.setup_logging(
114 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
115 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000116 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000117 parser = optparse.OptionParser(usage)
118 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
119 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000120 parser.add_option('--test', help='Indicate that scheduler is under ' +
121 'test and should use dummy autoserv and no parsing',
122 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700123 parser.add_option('--production',
124 help=('Indicate that scheduler is running in production '
125 'environment and it can use database that is not '
126 'hosted in localhost. If it is set to False, '
127 'scheduler will fail if database is not in '
128 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700129 action='store_true', default=False)
jadmanski0afbb632008-06-06 21:10:57 +0000130 (options, args) = parser.parse_args()
131 if len(args) != 1:
132 parser.print_usage()
133 return
mbligh36768f02008-02-22 18:28:33 +0000134
Dan Shif6c65bd2014-08-29 16:15:07 -0700135 scheduler_lib.check_production_settings(options)
136
showard5613c662009-06-08 23:30:33 +0000137 scheduler_enabled = global_config.global_config.get_config_value(
138 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
139
140 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800141 logging.error("Scheduler not enabled, set enable_scheduler to true in "
142 "the global_config's SCHEDULER section to enable it. "
143 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000144 sys.exit(1)
145
jadmanski0afbb632008-06-06 21:10:57 +0000146 global RESULTS_DIR
147 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000148
mbligh83c1e9e2009-05-01 23:10:41 +0000149 site_init = utils.import_site_function(__file__,
150 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
151 _site_init_monitor_db_dummy)
152 site_init()
153
showardcca334f2009-03-12 20:38:34 +0000154 # Change the cwd while running to avoid issues incase we were launched from
155 # somewhere odd (such as a random NFS home directory of the person running
156 # sudo to launch us as the appropriate user).
157 os.chdir(RESULTS_DIR)
158
jamesrenc7d387e2010-08-10 21:48:30 +0000159 # This is helpful for debugging why stuff a scheduler launches is
160 # misbehaving.
161 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000162
jadmanski0afbb632008-06-06 21:10:57 +0000163 if options.test:
164 global _autoserv_path
165 _autoserv_path = 'autoserv_dummy'
166 global _testing_mode
167 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000168
jamesrenc44ae992010-02-19 00:12:54 +0000169 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000170 server.start()
171
Dan Shicf2e8dd2015-05-07 17:18:48 -0700172 # Start the thread to report metadata.
173 metadata_reporter.start()
174
Paul Hobbsa0658662016-09-20 14:45:04 -0700175 with ts_mon_config.SetupTsMonGlobalState('autotest_scheduler',
176 indirect=True):
Richard Barnetteffed1722016-05-18 15:57:22 -0700177
Paul Hobbsa0658662016-09-20 14:45:04 -0700178 try:
179 initialize()
180 dispatcher = Dispatcher()
181 dispatcher.initialize(recover_hosts=options.recover_hosts)
182 minimum_tick_sec = global_config.global_config.get_config_value(
183 scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
showardc5afc462009-01-13 00:09:39 +0000184
Paul Hobbsa0658662016-09-20 14:45:04 -0700185 while not _shutdown and not server._shutdown_scheduler:
186 start = time.time()
187 dispatcher.tick()
188 curr_tick_sec = time.time() - start
189 if (minimum_tick_sec > curr_tick_sec):
190 time.sleep(minimum_tick_sec - curr_tick_sec)
191 else:
192 time.sleep(0.0001)
193 except server_manager_utils.ServerActionError as e:
194 # This error is expected when the server is not in primary status
195 # for scheduler role. Thus do not send email for it.
196 logging.exception(e)
197 except Exception:
198 email_manager.manager.log_stacktrace(
199 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000200
Paul Hobbsa0658662016-09-20 14:45:04 -0700201 metadata_reporter.abort()
202 email_manager.manager.send_queued_emails()
203 server.shutdown()
204 _drone_manager.shutdown()
205 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000206
207
Prashanth B4ec98672014-05-15 10:44:54 -0700208def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000209 global _shutdown
210 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000211 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000212
213
jamesrenc44ae992010-02-19 00:12:54 +0000214def initialize():
showardb18134f2009-03-20 20:52:18 +0000215 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
216 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000217
showard8de37132009-08-31 18:33:08 +0000218 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000219 logging.critical("monitor_db already running, aborting!")
220 sys.exit(1)
221 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000222
showardb1e51872008-10-07 11:08:18 +0000223 if _testing_mode:
224 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700225 scheduler_lib.DB_CONFIG_SECTION, 'database',
226 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000227
Dan Shib9144a42014-12-01 16:09:32 -0800228 # If server database is enabled, check if the server has role `scheduler`.
229 # If the server does not have scheduler role, exception will be raised and
230 # scheduler will not continue to run.
231 if server_manager_utils.use_server_db():
232 server_manager_utils.confirm_server_has_role(hostname='localhost',
233 role='scheduler')
234
jadmanski0afbb632008-06-06 21:10:57 +0000235 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700236 global _db_manager
237 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700238 global _db
239 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000240 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700241 signal.signal(signal.SIGINT, handle_signal)
242 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000243
jamesrenc44ae992010-02-19 00:12:54 +0000244 initialize_globals()
245 scheduler_models.initialize()
246
Dan Shi114e1722016-01-10 18:12:53 -0800247 drone_list = system_utils.get_drones()
showard170873e2009-01-07 00:22:26 +0000248 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000249 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000250 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
251
showardb18134f2009-03-20 20:52:18 +0000252 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000253
254
jamesrenc44ae992010-02-19 00:12:54 +0000255def initialize_globals():
256 global _drone_manager
257 _drone_manager = drone_manager.instance()
258
259
showarded2afea2009-07-07 20:54:07 +0000260def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
261 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000262 """
263 @returns The autoserv command line as a list of executable + parameters.
264
265 @param machines - string - A machine or comma separated list of machines
266 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000267 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700268 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
269 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000270 @param queue_entry - A HostQueueEntry object - If supplied and no Job
271 object was supplied, this will be used to lookup the Job object.
272 """
Simran Basi1bf60eb2015-12-01 16:39:29 -0800273 command = autoserv_utils.autoserv_run_job_command(_autoserv_directory,
Aviv Keshet308e7362013-05-21 14:43:16 -0700274 machines, results_directory=drone_manager.WORKING_DIRECTORY,
275 extra_args=extra_args, job=job, queue_entry=queue_entry,
Simran Basi1bf60eb2015-12-01 16:39:29 -0800276 verbose=verbose, in_lab=True)
277 return command
showard87ba02a2009-04-20 19:37:32 +0000278
279
Simran Basia858a232012-08-21 11:04:37 -0700280class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800281
282
jadmanski0afbb632008-06-06 21:10:57 +0000283 def __init__(self):
284 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000285 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700286 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000287 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700288 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700289 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Dan Shic458f662015-04-29 12:12:38 -0700290 _db, _drone_manager)
showard170873e2009-01-07 00:22:26 +0000291 self._host_agents = {}
292 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000293 self._tick_count = 0
294 self._last_garbage_stats_time = time.time()
295 self._seconds_between_garbage_stats = 60 * (
296 global_config.global_config.get_config_value(
297 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700298 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700299 self._tick_debug = global_config.global_config.get_config_value(
300 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
301 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700302 self._extra_debugging = global_config.global_config.get_config_value(
303 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
304 default=False)
mbligh36768f02008-02-22 18:28:33 +0000305
Prashanth Bf66d51b2014-05-06 12:42:25 -0700306 # If _inline_host_acquisition is set the scheduler will acquire and
307 # release hosts against jobs inline, with the tick. Otherwise the
308 # scheduler will only focus on jobs that already have hosts, and
309 # will not explicitly unlease a host when a job finishes using it.
310 self._job_query_manager = query_managers.AFEJobQueryManager()
311 self._host_scheduler = (host_scheduler.BaseHostScheduler()
312 if _inline_host_acquisition else
313 host_scheduler.DummyHostScheduler())
314
mbligh36768f02008-02-22 18:28:33 +0000315
showard915958d2009-04-22 21:00:58 +0000316 def initialize(self, recover_hosts=True):
317 self._periodic_cleanup.initialize()
318 self._24hr_upkeep.initialize()
Dan Shi55d58992015-05-05 09:10:02 -0700319 # Execute all actions queued in the cleanup tasks. Scheduler tick will
320 # run a refresh task first. If there is any action in the queue, refresh
321 # will raise an exception.
322 _drone_manager.execute_actions()
showard915958d2009-04-22 21:00:58 +0000323
jadmanski0afbb632008-06-06 21:10:57 +0000324 # always recover processes
325 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000326
jadmanski0afbb632008-06-06 21:10:57 +0000327 if recover_hosts:
328 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000329
330
Simran Basi0ec94dd2012-08-28 09:50:10 -0700331 def _log_tick_msg(self, msg):
332 if self._tick_debug:
333 logging.debug(msg)
334
335
Simran Basidef92872012-09-20 13:34:34 -0700336 def _log_extra_msg(self, msg):
337 if self._extra_debugging:
338 logging.debug(msg)
339
340
jadmanski0afbb632008-06-06 21:10:57 +0000341 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700342 """
343 This is an altered version of tick() where we keep track of when each
344 major step begins so we can try to figure out where we are using most
345 of the tick time.
346 """
Gabe Black1e1c41b2015-02-04 23:55:15 -0800347 timer = autotest_stats.Timer('scheduler.tick')
Dan Shi114e1722016-01-10 18:12:53 -0800348 system_utils.DroneCache.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700349 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000350 self._garbage_collection()
Prashanth B340fd1e2014-06-22 12:44:10 -0700351 self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
352 _drone_manager.trigger_refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700353 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000354 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700355 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000356 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700357 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000358 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700359 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000360 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700361 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000362 self._schedule_new_jobs()
Prashanth B340fd1e2014-06-22 12:44:10 -0700363 self._log_tick_msg('Calling _drone_manager.sync_refresh().')
364 _drone_manager.sync_refresh()
Dan Shi55d58992015-05-05 09:10:02 -0700365 # _run_cleanup must be called between drone_manager.sync_refresh, and
366 # drone_manager.execute_actions, as sync_refresh will clear the calls
367 # queued in drones. Therefore, any action that calls drone.queue_call
368 # to add calls to the drone._calls, should be after drone refresh is
369 # completed and before drone_manager.execute_actions at the end of the
370 # tick.
371 self._log_tick_msg('Calling _run_cleanup().')
372 self._run_cleanup()
Prashanth B67548092014-07-11 18:46:01 -0700373 self._log_tick_msg('Calling _find_aborting().')
374 self._find_aborting()
375 self._log_tick_msg('Calling _find_aborted_special_tasks().')
376 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700377 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000378 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700379 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000380 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700381 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000382 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700383 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700384 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700385 with timer.get_client('email_manager_send_queued_emails'):
386 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700387 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700388 with timer.get_client('django_db_reset_queries'):
389 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000390 self._tick_count += 1
Aviv Keshet65fed072016-06-29 10:20:55 -0700391 metrics.Counter('chromeos/autotest/scheduler/tick').increment()
mbligh36768f02008-02-22 18:28:33 +0000392
showard97aed502008-11-04 02:01:24 +0000393
mblighf3294cc2009-04-08 21:17:38 +0000394 def _run_cleanup(self):
395 self._periodic_cleanup.run_cleanup_maybe()
396 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000397
mbligh36768f02008-02-22 18:28:33 +0000398
showardf13a9e22009-12-18 22:54:09 +0000399 def _garbage_collection(self):
400 threshold_time = time.time() - self._seconds_between_garbage_stats
401 if threshold_time < self._last_garbage_stats_time:
402 # Don't generate these reports very often.
403 return
404
405 self._last_garbage_stats_time = time.time()
406 # Force a full level 0 collection (because we can, it doesn't hurt
407 # at this interval).
408 gc.collect()
409 logging.info('Logging garbage collector stats on tick %d.',
410 self._tick_count)
411 gc_stats._log_garbage_collector_stats()
412
413
showard170873e2009-01-07 00:22:26 +0000414 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
415 for object_id in object_ids:
416 agent_dict.setdefault(object_id, set()).add(agent)
417
418
419 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
420 for object_id in object_ids:
421 assert object_id in agent_dict
422 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700423 # If an ID has no more active agent associated, there is no need to
424 # keep it in the dictionary. Otherwise, scheduler will keep an
425 # unnecessarily big dictionary until being restarted.
426 if not agent_dict[object_id]:
427 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000428
429
showardd1195652009-12-08 22:21:02 +0000430 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700431 """
432 Creates and adds an agent to the dispatchers list.
433
434 In creating the agent we also pass on all the queue_entry_ids and
435 host_ids from the special agent task. For every agent we create, we
436 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
437 against the host_ids given to it. So theoritically, a host can have any
438 number of agents associated with it, and each of them can have any
439 special agent task, though in practice we never see > 1 agent/task per
440 host at any time.
441
442 @param agent_task: A SpecialTask for the agent to manage.
443 """
showardd1195652009-12-08 22:21:02 +0000444 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000445 self._agents.append(agent)
446 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000447 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
448 self._register_agent_for_ids(self._queue_entry_agents,
449 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000450
showard170873e2009-01-07 00:22:26 +0000451
452 def get_agents_for_entry(self, queue_entry):
453 """
454 Find agents corresponding to the specified queue_entry.
455 """
showardd3dc1992009-04-22 21:01:40 +0000456 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000457
458
459 def host_has_agent(self, host):
460 """
461 Determine if there is currently an Agent present using this host.
462 """
463 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000464
465
jadmanski0afbb632008-06-06 21:10:57 +0000466 def remove_agent(self, agent):
467 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000468 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
469 agent)
470 self._unregister_agent_for_ids(self._queue_entry_agents,
471 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000472
473
showard8cc058f2009-09-08 16:26:33 +0000474 def _host_has_scheduled_special_task(self, host):
475 return bool(models.SpecialTask.objects.filter(host__id=host.id,
476 is_active=False,
477 is_complete=False))
478
479
jadmanski0afbb632008-06-06 21:10:57 +0000480 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000481 agent_tasks = self._create_recovery_agent_tasks()
482 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000483 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000484 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000485 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000486 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000487 self._reverify_remaining_hosts()
488 # reinitialize drones after killing orphaned processes, since they can
489 # leave around files when they die
490 _drone_manager.execute_actions()
491 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000492
showard170873e2009-01-07 00:22:26 +0000493
showardd1195652009-12-08 22:21:02 +0000494 def _create_recovery_agent_tasks(self):
495 return (self._get_queue_entry_agent_tasks()
496 + self._get_special_task_agent_tasks(is_active=True))
497
498
499 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700500 """
501 Get agent tasks for all hqe in the specified states.
502
503 Loosely this translates to taking a hqe in one of the specified states,
504 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
505 through _get_agent_task_for_queue_entry. Each queue entry can only have
506 one agent task at a time, but there might be multiple queue entries in
507 the group.
508
509 @return: A list of AgentTasks.
510 """
showardd1195652009-12-08 22:21:02 +0000511 # host queue entry statuses handled directly by AgentTasks (Verifying is
512 # handled through SpecialTasks, so is not listed here)
513 statuses = (models.HostQueueEntry.Status.STARTING,
514 models.HostQueueEntry.Status.RUNNING,
515 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000516 models.HostQueueEntry.Status.PARSING,
517 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000518 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000519 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000520 where='status IN (%s)' % status_list)
Gabe Black1e1c41b2015-02-04 23:55:15 -0800521 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Alex Miller47cd2472013-11-25 15:20:04 -0800522 'running', len(queue_entries))
showardd1195652009-12-08 22:21:02 +0000523
524 agent_tasks = []
525 used_queue_entries = set()
526 for entry in queue_entries:
527 if self.get_agents_for_entry(entry):
528 # already being handled
529 continue
530 if entry in used_queue_entries:
531 # already picked up by a synchronous job
532 continue
533 agent_task = self._get_agent_task_for_queue_entry(entry)
534 agent_tasks.append(agent_task)
535 used_queue_entries.update(agent_task.queue_entries)
536 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000537
538
showardd1195652009-12-08 22:21:02 +0000539 def _get_special_task_agent_tasks(self, is_active=False):
540 special_tasks = models.SpecialTask.objects.filter(
541 is_active=is_active, is_complete=False)
542 return [self._get_agent_task_for_special_task(task)
543 for task in special_tasks]
544
545
546 def _get_agent_task_for_queue_entry(self, queue_entry):
547 """
beeps8bb1f7d2013-08-05 01:30:09 -0700548 Construct an AgentTask instance for the given active HostQueueEntry.
549
showardd1195652009-12-08 22:21:02 +0000550 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700551 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000552 """
553 task_entries = queue_entry.job.get_group_entries(queue_entry)
554 self._check_for_duplicate_host_entries(task_entries)
555
556 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
557 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000558 if queue_entry.is_hostless():
559 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000560 return QueueTask(queue_entries=task_entries)
561 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700562 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000563 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700564 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000565 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700566 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000567
Prashanth B0e960282014-05-13 19:38:28 -0700568 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800569 '_get_agent_task_for_queue_entry got entry with '
570 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000571
572
573 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000574 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
575 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000576 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000577 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000578 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000579 if using_host:
showardd1195652009-12-08 22:21:02 +0000580 self._assert_host_has_no_agent(task_entry)
581
582
583 def _assert_host_has_no_agent(self, entry):
584 """
585 @param entry: a HostQueueEntry or a SpecialTask
586 """
587 if self.host_has_agent(entry.host):
588 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700589 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000590 'While scheduling %s, host %s already has a host agent %s'
591 % (entry, entry.host, agent.task))
592
593
594 def _get_agent_task_for_special_task(self, special_task):
595 """
596 Construct an AgentTask class to run the given SpecialTask and add it
597 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700598
MK Ryu35d661e2014-09-25 17:44:10 -0700599 A special task is created through schedule_special_tasks, but only if
beeps8bb1f7d2013-08-05 01:30:09 -0700600 the host doesn't already have an agent. This happens through
601 add_agent_task. All special agent tasks are given a host on creation,
602 and a Null hqe. To create a SpecialAgentTask object, you need a
603 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
604 object contains a hqe it's passed on to the special agent task, which
605 creates a HostQueueEntry and saves it as it's queue_entry.
606
showardd1195652009-12-08 22:21:02 +0000607 @param special_task: a models.SpecialTask instance
608 @returns an AgentTask to run this SpecialTask
609 """
610 self._assert_host_has_no_agent(special_task)
611
beeps5e2bb4a2013-10-28 11:26:45 -0700612 special_agent_task_classes = (prejob_task.CleanupTask,
613 prejob_task.VerifyTask,
614 prejob_task.RepairTask,
615 prejob_task.ResetTask,
616 prejob_task.ProvisionTask)
617
showardd1195652009-12-08 22:21:02 +0000618 for agent_task_class in special_agent_task_classes:
619 if agent_task_class.TASK_TYPE == special_task.task:
620 return agent_task_class(task=special_task)
621
Prashanth B0e960282014-05-13 19:38:28 -0700622 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800623 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000624
625
626 def _register_pidfiles(self, agent_tasks):
627 for agent_task in agent_tasks:
628 agent_task.register_necessary_pidfiles()
629
630
631 def _recover_tasks(self, agent_tasks):
632 orphans = _drone_manager.get_orphaned_autoserv_processes()
633
634 for agent_task in agent_tasks:
635 agent_task.recover()
636 if agent_task.monitor and agent_task.monitor.has_process():
637 orphans.discard(agent_task.monitor.get_process())
638 self.add_agent_task(agent_task)
639
640 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000641
642
showard8cc058f2009-09-08 16:26:33 +0000643 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000644 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
645 % status):
showard0db3d432009-10-12 20:29:15 +0000646 if entry.status == status and not self.get_agents_for_entry(entry):
647 # The status can change during iteration, e.g., if job.run()
648 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000649 yield entry
650
651
showard6878e8b2009-07-20 22:37:45 +0000652 def _check_for_remaining_orphan_processes(self, orphans):
653 if not orphans:
654 return
655 subject = 'Unrecovered orphan autoserv processes remain'
656 message = '\n'.join(str(process) for process in orphans)
657 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000658
659 die_on_orphans = global_config.global_config.get_config_value(
660 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
661
662 if die_on_orphans:
663 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000664
showard170873e2009-01-07 00:22:26 +0000665
showard8cc058f2009-09-08 16:26:33 +0000666 def _recover_pending_entries(self):
667 for entry in self._get_unassigned_entries(
668 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000669 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000670 entry.on_pending()
671
672
showardb8900452009-10-12 20:31:01 +0000673 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000674 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000675 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
676 unrecovered_hqes = []
677 for queue_entry in queue_entries:
678 special_tasks = models.SpecialTask.objects.filter(
679 task__in=(models.SpecialTask.Task.CLEANUP,
680 models.SpecialTask.Task.VERIFY),
681 queue_entry__id=queue_entry.id,
682 is_complete=False)
683 if special_tasks.count() == 0:
684 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000685
showardb8900452009-10-12 20:31:01 +0000686 if unrecovered_hqes:
687 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700688 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000689 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000690 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000691
692
showard65db3932009-10-28 19:54:35 +0000693 def _schedule_special_tasks(self):
694 """
695 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700696
697 Special tasks include PreJobTasks like verify, reset and cleanup.
698 They are created through _schedule_new_jobs and associated with a hqe
699 This method translates SpecialTasks to the appropriate AgentTask and
700 adds them to the dispatchers agents list, so _handle_agents can execute
701 them.
showard65db3932009-10-28 19:54:35 +0000702 """
Prashanth B4ec98672014-05-15 10:44:54 -0700703 # When the host scheduler is responsible for acquisition we only want
704 # to run tasks with leased hosts. All hqe tasks will already have
705 # leased hosts, and we don't want to run frontend tasks till the host
706 # scheduler has vetted the assignment. Note that this doesn't include
707 # frontend tasks with hosts leased by other active hqes.
708 for task in self._job_query_manager.get_prioritized_special_tasks(
709 only_tasks_with_leased_hosts=not _inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000710 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000711 continue
showardd1195652009-12-08 22:21:02 +0000712 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000713
714
showard170873e2009-01-07 00:22:26 +0000715 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000716 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000717 # should never happen
showarded2afea2009-07-07 20:54:07 +0000718 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000719 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000720 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700721 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000722 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000723
724
jadmanski0afbb632008-06-06 21:10:57 +0000725 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000726 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700727 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000728 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000729 if self.host_has_agent(host):
730 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000731 continue
showard8cc058f2009-09-08 16:26:33 +0000732 if self._host_has_scheduled_special_task(host):
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700733 # host will have a special task scheduled on the next tick
showard8cc058f2009-09-08 16:26:33 +0000734 continue
showard170873e2009-01-07 00:22:26 +0000735 if print_message:
showardb18134f2009-03-20 20:52:18 +0000736 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000737 models.SpecialTask.objects.create(
738 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000739 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000740
741
jadmanski0afbb632008-06-06 21:10:57 +0000742 def _recover_hosts(self):
743 # recover "Repair Failed" hosts
744 message = 'Reverifying dead host %s'
745 self._reverify_hosts_where("status = 'Repair Failed'",
746 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000747
748
showard89f84db2009-03-12 20:39:13 +0000749 def _refresh_pending_queue_entries(self):
750 """
751 Lookup the pending HostQueueEntries and call our HostScheduler
752 refresh() method given that list. Return the list.
753
754 @returns A list of pending HostQueueEntries sorted in priority order.
755 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700756 queue_entries = self._job_query_manager.get_pending_queue_entries(
757 only_hostless=not _inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000758 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000759 return []
showard89f84db2009-03-12 20:39:13 +0000760 return queue_entries
761
762
showarda9545c02009-12-18 22:44:26 +0000763 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800764 """Schedule a hostless (suite) job.
765
766 @param queue_entry: The queue_entry representing the hostless job.
767 """
showarda9545c02009-12-18 22:44:26 +0000768 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700769
770 # Need to set execution_subdir before setting the status:
771 # After a restart of the scheduler, agents will be restored for HQEs in
772 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
773 # execution_subdir is needed. Therefore it must be set before entering
774 # one of these states.
775 # Otherwise, if the scheduler was interrupted between setting the status
776 # and the execution_subdir, upon it's restart restoring agents would
777 # fail.
778 # Is there a way to get a status in one of these states without going
779 # through this code? Following cases are possible:
780 # - If it's aborted before being started:
781 # active bit will be 0, so there's nothing to parse, it will just be
782 # set to completed by _find_aborting. Critical statuses are skipped.
783 # - If it's aborted or it fails after being started:
784 # It was started, so this code was executed.
785 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000786 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000787
788
beepscc9fc702013-12-02 12:45:38 -0800789 def _schedule_host_job(self, host, queue_entry):
790 """Schedules a job on the given host.
791
792 1. Assign the host to the hqe, if it isn't already assigned.
793 2. Create a SpecialAgentTask for the hqe.
794 3. Activate the hqe.
795
796 @param queue_entry: The job to schedule.
797 @param host: The host to schedule the job on.
798 """
799 if self.host_has_agent(host):
800 host_agent_task = list(self._host_agents.get(host.id))[0].task
801 subject = 'Host with agents assigned to an HQE'
802 message = ('HQE: %s assigned host %s, but the host has '
803 'agent: %s for queue_entry %s. The HQE '
beepsf7d35162014-02-13 16:42:13 -0800804 'will have to try and acquire a host next tick ' %
beepscc9fc702013-12-02 12:45:38 -0800805 (queue_entry, host.hostname, host_agent_task,
806 host_agent_task.queue_entry))
807 email_manager.manager.enqueue_notify_email(subject, message)
beepscc9fc702013-12-02 12:45:38 -0800808 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700809 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800810
811
showard89f84db2009-03-12 20:39:13 +0000812 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700813 """
814 Find any new HQEs and call schedule_pre_job_tasks for it.
815
816 This involves setting the status of the HQE and creating a row in the
817 db corresponding the the special task, through
818 scheduler_models._queue_special_task. The new db row is then added as
819 an agent to the dispatcher through _schedule_special_tasks and
820 scheduled for execution on the drone through _handle_agents.
821 """
showard89f84db2009-03-12 20:39:13 +0000822 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000823
beepscc9fc702013-12-02 12:45:38 -0800824 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700825 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700826 new_jobs_with_hosts = 0
827 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800828 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700829 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000830
beepscc9fc702013-12-02 12:45:38 -0800831 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000832 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000833 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700834 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000835 else:
beepscc9fc702013-12-02 12:45:38 -0800836 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700837 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700838
Gabe Black1e1c41b2015-02-04 23:55:15 -0800839 autotest_stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800840 if not host_jobs:
841 return
Prashanth Bf66d51b2014-05-06 12:42:25 -0700842 if not _inline_host_acquisition:
843 message = ('Found %s jobs that need hosts though '
844 '_inline_host_acquisition=%s. Will acquire hosts.' %
845 ([str(job) for job in host_jobs],
846 _inline_host_acquisition))
847 email_manager.manager.enqueue_notify_email(
848 'Processing unexpected host acquisition requests', message)
849 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
850 for host_assignment in jobs_with_hosts:
851 self._schedule_host_job(host_assignment.host, host_assignment.job)
852 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800853
Gabe Black1e1c41b2015-02-04 23:55:15 -0800854 autotest_stats.Gauge(key).send('new_jobs_with_hosts',
855 new_jobs_with_hosts)
856 autotest_stats.Gauge(key).send('new_jobs_without_hosts',
857 new_jobs_need_hosts -
858 new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000859
860
showard8cc058f2009-09-08 16:26:33 +0000861 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700862 """
863 Adds agents to the dispatcher.
864
865 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
866 QueueTask for example, will have a job with a control file, and
867 the agent will have methods that poll, abort and check if the queue
868 task is finished. The dispatcher runs the agent_task, as well as
869 other agents in it's _agents member, through _handle_agents, by
870 calling the Agents tick().
871
872 This method creates an agent for each HQE in one of (starting, running,
873 gathering, parsing, archiving) states, and adds it to the dispatcher so
874 it is handled by _handle_agents.
875 """
showardd1195652009-12-08 22:21:02 +0000876 for agent_task in self._get_queue_entry_agent_tasks():
877 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000878
879
880 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000881 for entry in scheduler_models.HostQueueEntry.fetch(
882 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000883 task = entry.job.schedule_delayed_callback_task(entry)
884 if task:
showardd1195652009-12-08 22:21:02 +0000885 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000886
887
jadmanski0afbb632008-06-06 21:10:57 +0000888 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700889 """
890 Looks through the afe_host_queue_entries for an aborted entry.
891
892 The aborted bit is set on an HQE in many ways, the most common
893 being when a user requests an abort through the frontend, which
894 results in an rpc from the afe to abort_host_queue_entries.
895 """
jamesrene7c65cb2010-06-08 20:38:10 +0000896 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000897 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700898 where='aborted=1 and complete=0'):
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800899
900 # If the job is running on a shard, let the shard handle aborting
901 # it and sync back the right status.
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800902 if entry.job.shard_id is not None and not server_utils.is_shard():
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800903 logging.info('Waiting for shard %s to abort hqe %s',
904 entry.job.shard_id, entry)
905 continue
906
showardf4a2e502009-07-28 20:06:39 +0000907 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800908
909 # The task would have started off with both is_complete and
910 # is_active = False. Aborted tasks are neither active nor complete.
911 # For all currently active tasks this will happen through the agent,
912 # but we need to manually update the special tasks that haven't
913 # started yet, because they don't have agents.
914 models.SpecialTask.objects.filter(is_active=False,
915 queue_entry_id=entry.id).update(is_complete=True)
916
showardd3dc1992009-04-22 21:01:40 +0000917 for agent in self.get_agents_for_entry(entry):
918 agent.abort()
919 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000920 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700921 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000922 for job in jobs_to_stop:
923 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000924
925
beeps8bb1f7d2013-08-05 01:30:09 -0700926 def _find_aborted_special_tasks(self):
927 """
928 Find SpecialTasks that have been marked for abortion.
929
930 Poll the database looking for SpecialTasks that are active
931 and have been marked for abortion, then abort them.
932 """
933
934 # The completed and active bits are very important when it comes
935 # to scheduler correctness. The active bit is set through the prolog
936 # of a special task, and reset through the cleanup method of the
937 # SpecialAgentTask. The cleanup is called both through the abort and
938 # epilog. The complete bit is set in several places, and in general
939 # a hanging job will have is_active=1 is_complete=0, while a special
940 # task which completed will have is_active=0 is_complete=1. To check
941 # aborts we directly check active because the complete bit is set in
942 # several places, including the epilog of agent tasks.
943 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
944 is_aborted=True)
945 for task in aborted_tasks:
946 # There are 2 ways to get the agent associated with a task,
947 # through the host and through the hqe. A special task
948 # always needs a host, but doesn't always need a hqe.
949 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700950 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000951
beeps8bb1f7d2013-08-05 01:30:09 -0700952 # The epilog preforms critical actions such as
953 # queueing the next SpecialTask, requeuing the
954 # hqe etc, however it doesn't actually kill the
955 # monitor process and set the 'done' bit. Epilogs
956 # assume that the job failed, and that the monitor
957 # process has already written an exit code. The
958 # done bit is a necessary condition for
959 # _handle_agents to schedule any more special
960 # tasks against the host, and it must be set
961 # in addition to is_active, is_complete and success.
962 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000963 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700964
965
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700966 def _can_start_agent(self, agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000967 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000968 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000969 return True
970 # don't allow any nonzero-process agents to run after we've reached a
971 # limit (this avoids starvation of many-process agents)
972 if have_reached_limit:
973 return False
974 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000975 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000976 agent.task.owner_username,
977 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000978 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000979 return False
showard4c5374f2008-09-04 17:02:56 +0000980 return True
981
982
jadmanski0afbb632008-06-06 21:10:57 +0000983 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700984 """
985 Handles agents of the dispatcher.
986
987 Appropriate Agents are added to the dispatcher through
988 _schedule_running_host_queue_entries. These agents each
989 have a task. This method runs the agents task through
990 agent.tick() leading to:
991 agent.start
992 prolog -> AgentTasks prolog
993 For each queue entry:
994 sets host status/status to Running
995 set started_on in afe_host_queue_entries
996 run -> AgentTasks run
997 Creates PidfileRunMonitor
998 Queues the autoserv command line for this AgentTask
999 via the drone manager. These commands are executed
1000 through the drone managers execute actions.
1001 poll -> AgentTasks/BaseAgentTask poll
1002 checks the monitors exit_code.
1003 Executes epilog if task is finished.
1004 Executes AgentTasks _finish_task
1005 finish_task is usually responsible for setting the status
1006 of the HQE/host, and updating it's active and complete fileds.
1007
1008 agent.is_done
1009 Removed the agent from the dispatchers _agents queue.
1010 Is_done checks the finished bit on the agent, that is
1011 set based on the Agents task. During the agents poll
1012 we check to see if the monitor process has exited in
1013 it's finish method, and set the success member of the
1014 task based on this exit code.
1015 """
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001016 num_started_this_tick = 0
1017 num_finished_this_tick = 0
showard4c5374f2008-09-04 17:02:56 +00001018 have_reached_limit = False
1019 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -07001020 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +00001021 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001022 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1023 'queue_entry ids:%s' % (agent.host_ids,
1024 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001025 if not agent.started:
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001026 if not self._can_start_agent(agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001027 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001028 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001029 continue
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001030 num_started_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001031 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001032 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001033 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001034 if agent.is_done():
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001035 num_finished_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001036 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001037 self.remove_agent(agent)
Gabe Black1e1c41b2015-02-04 23:55:15 -08001038 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001039 'agents_started', num_started_this_tick)
Gabe Black1e1c41b2015-02-04 23:55:15 -08001040 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001041 'agents_finished', num_finished_this_tick)
1042 logging.info('%d running processes. %d added this tick.',
Simran Basi3f6717d2012-09-13 15:21:22 -07001043 _drone_manager.total_running_processes(),
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001044 num_started_this_tick)
mbligh36768f02008-02-22 18:28:33 +00001045
1046
showard29f7cd22009-04-29 21:16:24 +00001047 def _process_recurring_runs(self):
1048 recurring_runs = models.RecurringRun.objects.filter(
1049 start_date__lte=datetime.datetime.now())
1050 for rrun in recurring_runs:
1051 # Create job from template
1052 job = rrun.job
1053 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001054 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001055
1056 host_objects = info['hosts']
1057 one_time_hosts = info['one_time_hosts']
1058 metahost_objects = info['meta_hosts']
1059 dependencies = info['dependencies']
1060 atomic_group = info['atomic_group']
1061
1062 for host in one_time_hosts or []:
1063 this_host = models.Host.create_one_time_host(host.hostname)
1064 host_objects.append(this_host)
1065
1066 try:
1067 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001068 options=options,
showard29f7cd22009-04-29 21:16:24 +00001069 host_objects=host_objects,
1070 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001071 atomic_group=atomic_group)
1072
1073 except Exception, ex:
1074 logging.exception(ex)
1075 #TODO send email
1076
1077 if rrun.loop_count == 1:
1078 rrun.delete()
1079 else:
1080 if rrun.loop_count != 0: # if not infinite loop
1081 # calculate new start_date
1082 difference = datetime.timedelta(seconds=rrun.loop_period)
1083 rrun.start_date = rrun.start_date + difference
1084 rrun.loop_count -= 1
1085 rrun.save()
1086
1087
Simran Basia858a232012-08-21 11:04:37 -07001088SiteDispatcher = utils.import_site_class(
1089 __file__, 'autotest_lib.scheduler.site_monitor_db',
1090 'SiteDispatcher', BaseDispatcher)
1091
1092class Dispatcher(SiteDispatcher):
1093 pass
1094
1095
mbligh36768f02008-02-22 18:28:33 +00001096class Agent(object):
showard77182562009-06-10 00:16:05 +00001097 """
Alex Miller47715eb2013-07-24 03:34:01 -07001098 An agent for use by the Dispatcher class to perform a task. An agent wraps
1099 around an AgentTask mainly to associate the AgentTask with the queue_entry
1100 and host ids.
showard77182562009-06-10 00:16:05 +00001101
1102 The following methods are required on all task objects:
1103 poll() - Called periodically to let the task check its status and
1104 update its internal state. If the task succeeded.
1105 is_done() - Returns True if the task is finished.
1106 abort() - Called when an abort has been requested. The task must
1107 set its aborted attribute to True if it actually aborted.
1108
1109 The following attributes are required on all task objects:
1110 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001111 success - bool, True if this task succeeded.
1112 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1113 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001114 """
1115
1116
showard418785b2009-11-23 20:19:59 +00001117 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001118 """
Alex Miller47715eb2013-07-24 03:34:01 -07001119 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001120 """
showard8cc058f2009-09-08 16:26:33 +00001121 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001122
showard77182562009-06-10 00:16:05 +00001123 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001124 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001125
showard8cc058f2009-09-08 16:26:33 +00001126 self.queue_entry_ids = task.queue_entry_ids
1127 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001128
showard8cc058f2009-09-08 16:26:33 +00001129 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001130 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001131
1132
jadmanski0afbb632008-06-06 21:10:57 +00001133 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001134 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001135 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001136 self.task.poll()
1137 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001138 self.finished = True
showardec113162008-05-08 00:52:49 +00001139
1140
jadmanski0afbb632008-06-06 21:10:57 +00001141 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001142 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001143
1144
showardd3dc1992009-04-22 21:01:40 +00001145 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001146 if self.task:
1147 self.task.abort()
1148 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001149 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001150 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001151
showardd3dc1992009-04-22 21:01:40 +00001152
beeps5e2bb4a2013-10-28 11:26:45 -07001153class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001154 """
1155 Common functionality for QueueTask and HostlessQueueTask
1156 """
1157 def __init__(self, queue_entries):
1158 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001159 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001160 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001161
1162
showard73ec0442009-02-07 02:05:20 +00001163 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001164 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001165
1166
jamesrenc44ae992010-02-19 00:12:54 +00001167 def _write_control_file(self, execution_path):
1168 control_path = _drone_manager.attach_file_to_execution(
1169 execution_path, self.job.control_file)
1170 return control_path
1171
1172
Aviv Keshet308e7362013-05-21 14:43:16 -07001173 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001174 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001175 execution_path = self.queue_entries[0].execution_path()
1176 control_path = self._write_control_file(execution_path)
1177 hostnames = ','.join(entry.host.hostname
1178 for entry in self.queue_entries
1179 if not entry.is_hostless())
1180
1181 execution_tag = self.queue_entries[0].execution_tag()
1182 params = _autoserv_command_line(
1183 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001184 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001185 _drone_manager.absolute_path(control_path)],
1186 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001187 if self.job.is_image_update_job():
1188 params += ['--image', self.job.update_image_path]
1189
jamesrenc44ae992010-02-19 00:12:54 +00001190 return params
showardd1195652009-12-08 22:21:02 +00001191
1192
1193 @property
1194 def num_processes(self):
1195 return len(self.queue_entries)
1196
1197
1198 @property
1199 def owner_username(self):
1200 return self.job.owner
1201
1202
1203 def _working_directory(self):
1204 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001205
1206
jadmanski0afbb632008-06-06 21:10:57 +00001207 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001208 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001209 keyval_dict = self.job.keyval_dict()
1210 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001211 group_name = self.queue_entries[0].get_group_name()
1212 if group_name:
1213 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001214 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001215 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001216 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001217 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001218
1219
showard35162b02009-03-03 02:17:30 +00001220 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001221 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001222 _drone_manager.write_lines_to_file(error_file_path,
1223 [_LOST_PROCESS_ERROR])
1224
1225
showardd3dc1992009-04-22 21:01:40 +00001226 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001227 if not self.monitor:
1228 return
1229
showardd9205182009-04-27 20:09:55 +00001230 self._write_job_finished()
1231
showard35162b02009-03-03 02:17:30 +00001232 if self.monitor.lost_process:
1233 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001234
jadmanskif7fa2cc2008-10-01 14:13:23 +00001235
showardcbd74612008-11-19 21:42:02 +00001236 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001237 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001238 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001239 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001240 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001241
1242
jadmanskif7fa2cc2008-10-01 14:13:23 +00001243 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001244 if not self.monitor or not self.monitor.has_process():
1245 return
1246
jadmanskif7fa2cc2008-10-01 14:13:23 +00001247 # build up sets of all the aborted_by and aborted_on values
1248 aborted_by, aborted_on = set(), set()
1249 for queue_entry in self.queue_entries:
1250 if queue_entry.aborted_by:
1251 aborted_by.add(queue_entry.aborted_by)
1252 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1253 aborted_on.add(t)
1254
1255 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001256 # TODO(showard): this conditional is now obsolete, we just need to leave
1257 # it in temporarily for backwards compatibility over upgrades. delete
1258 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001259 assert len(aborted_by) <= 1
1260 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001261 aborted_by_value = aborted_by.pop()
1262 aborted_on_value = max(aborted_on)
1263 else:
1264 aborted_by_value = 'autotest_system'
1265 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001266
showarda0382352009-02-11 23:36:43 +00001267 self._write_keyval_after_job("aborted_by", aborted_by_value)
1268 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001269
showardcbd74612008-11-19 21:42:02 +00001270 aborted_on_string = str(datetime.datetime.fromtimestamp(
1271 aborted_on_value))
1272 self._write_status_comment('Job aborted by %s on %s' %
1273 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001274
1275
jadmanski0afbb632008-06-06 21:10:57 +00001276 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001277 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001278 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001279 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001280
1281
jadmanski0afbb632008-06-06 21:10:57 +00001282 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001283 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001284 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001285
1286
1287class QueueTask(AbstractQueueTask):
1288 def __init__(self, queue_entries):
1289 super(QueueTask, self).__init__(queue_entries)
1290 self._set_ids(queue_entries=queue_entries)
1291
1292
1293 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001294 self._check_queue_entry_statuses(
1295 self.queue_entries,
1296 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1297 models.HostQueueEntry.Status.RUNNING),
1298 allowed_host_statuses=(models.Host.Status.PENDING,
1299 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001300
1301 super(QueueTask, self).prolog()
1302
1303 for queue_entry in self.queue_entries:
1304 self._write_host_keyvals(queue_entry.host)
1305 queue_entry.host.set_status(models.Host.Status.RUNNING)
1306 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001307
1308
1309 def _finish_task(self):
1310 super(QueueTask, self)._finish_task()
1311
1312 for queue_entry in self.queue_entries:
1313 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001314 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001315
1316
Alex Miller9f01d5d2013-08-08 02:26:01 -07001317 def _command_line(self):
Dan Shi36cfd832014-10-10 13:38:51 -07001318 invocation = super(QueueTask, self)._command_line()
1319 # Check if server-side packaging is needed.
1320 if (_enable_ssp_container and
1321 self.job.control_type == control_data.CONTROL_TYPE.SERVER and
1322 self.job.require_ssp != False):
Dan Shiec1d47d2015-02-13 11:38:13 -08001323 invocation += ['--require-ssp']
Dan Shi36cfd832014-10-10 13:38:51 -07001324 keyval_dict = self.job.keyval_dict()
1325 test_source_build = keyval_dict.get('test_source_build', None)
1326 if test_source_build:
1327 invocation += ['--test_source_build', test_source_build]
Dan Shi70647ca2015-07-16 22:52:35 -07001328 if self.job.parent_job_id:
1329 invocation += ['--parent_job_id', str(self.job.parent_job_id)]
Dan Shi36cfd832014-10-10 13:38:51 -07001330 return invocation + ['--verify_job_repo_url']
Alex Miller9f01d5d2013-08-08 02:26:01 -07001331
1332
Dan Shi1a189052013-10-28 14:41:35 -07001333class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001334 def __init__(self, queue_entry):
1335 super(HostlessQueueTask, self).__init__([queue_entry])
1336 self.queue_entry_ids = [queue_entry.id]
1337
1338
1339 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001340 super(HostlessQueueTask, self).prolog()
1341
1342
mbligh4608b002010-01-05 18:22:35 +00001343 def _finish_task(self):
1344 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001345
1346 # When a job is added to database, its initial status is always
1347 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1348 # status, check if any of them can be started. If scheduler hits some
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001349 # limit, e.g., max_hostless_jobs_per_drone, scheduler will
1350 # leave these jobs in Starting status. Otherwise, the jobs'
1351 # status will be changed to Running, and an autoserv process
1352 # will be started in drone for each of these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001353 # If the entry is still in status Starting, the process has not started
1354 # yet. Therefore, there is no need to parse and collect log. Without
1355 # this check, exception will be raised by scheduler as execution_subdir
1356 # for this queue entry does not have a value yet.
1357 hqe = self.queue_entries[0]
1358 if hqe.status != models.HostQueueEntry.Status.STARTING:
1359 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001360
1361
mbligh36768f02008-02-22 18:28:33 +00001362if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001363 main()