blob: 09d7dd1e5a6a5ab8df6c73e54bed3d79ca4397b5 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
mbligh36768f02008-02-22 18:28:33 +00002"""
3Autotest scheduler
4"""
showard909c7a62008-07-15 21:52:38 +00005
Dan Shif6c65bd2014-08-29 16:15:07 -07006import datetime
7import gc
8import logging
9import optparse
10import os
11import signal
12import sys
13import time
showard402934a2009-12-21 22:20:47 +000014
Alex Miller05d7b4c2013-03-04 07:49:38 -080015import common
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
Dan Shiec1d47d2015-02-13 11:38:13 -080020from autotest_lib.client.common_lib import control_data
Prashanth B0e960282014-05-13 19:38:28 -070021from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070022from autotest_lib.client.common_lib import utils
Gabe Black1e1c41b2015-02-04 23:55:15 -080023from autotest_lib.client.common_lib.cros.graphite import autotest_stats
Prashanth B0e960282014-05-13 19:38:28 -070024from autotest_lib.frontend.afe import models, rpc_utils
Fang Dengc330bee2014-10-21 18:10:55 -070025from autotest_lib.scheduler import agent_task, drone_manager
beeps5e2bb4a2013-10-28 11:26:45 -070026from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
27from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070028from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070029from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070030from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000031from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080032from autotest_lib.scheduler import status_server, scheduler_config
Prashanth Bf66d51b2014-05-06 12:42:25 -070033from autotest_lib.scheduler import scheduler_lib
Aviv Keshet308e7362013-05-21 14:43:16 -070034from autotest_lib.server import autoserv_utils
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080035from autotest_lib.server import utils as server_utils
Dan Shib9144a42014-12-01 16:09:32 -080036from autotest_lib.site_utils import server_manager_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080037
showard549afad2009-08-20 23:33:36 +000038BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
39PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000040
mbligh36768f02008-02-22 18:28:33 +000041RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000042AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
43
44if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000045 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000046AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
47AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
48
49if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000050 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000051
showard35162b02009-03-03 02:17:30 +000052# error message to leave in results dir when an autoserv process disappears
53# mysteriously
54_LOST_PROCESS_ERROR = """\
55Autoserv failed abnormally during execution for this job, probably due to a
56system error on the Autotest server. Full results may not be available. Sorry.
57"""
58
Prashanth B0e960282014-05-13 19:38:28 -070059_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070060_db = None
mbligh36768f02008-02-22 18:28:33 +000061_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070062
63# These 2 globals are replaced for testing
64_autoserv_directory = autoserv_utils.autoserv_directory
65_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000066_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000067_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070068_inline_host_acquisition = global_config.global_config.get_config_value(
69 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
70 default=True)
71
Dan Shiec1d47d2015-02-13 11:38:13 -080072_enable_ssp_container = global_config.global_config.get_config_value(
73 'AUTOSERV', 'enable_ssp_container', type=bool,
74 default=True)
mbligh36768f02008-02-22 18:28:33 +000075
mbligh83c1e9e2009-05-01 23:10:41 +000076def _site_init_monitor_db_dummy():
77 return {}
78
79
jamesren76fcf192010-04-21 20:39:50 +000080def _verify_default_drone_set_exists():
81 if (models.DroneSet.drone_sets_enabled() and
82 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070083 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080084 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000085
86
87def _sanity_check():
88 """Make sure the configs are consistent before starting the scheduler"""
89 _verify_default_drone_set_exists()
90
91
mbligh36768f02008-02-22 18:28:33 +000092def main():
showard27f33872009-04-07 18:20:53 +000093 try:
showard549afad2009-08-20 23:33:36 +000094 try:
95 main_without_exception_handling()
96 except SystemExit:
97 raise
98 except:
99 logging.exception('Exception escaping in monitor_db')
100 raise
101 finally:
102 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000103
104
105def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700106 scheduler_lib.setup_logging(
107 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
108 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000109 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000110 parser = optparse.OptionParser(usage)
111 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
112 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000113 parser.add_option('--test', help='Indicate that scheduler is under ' +
114 'test and should use dummy autoserv and no parsing',
115 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700116 parser.add_option('--production',
117 help=('Indicate that scheduler is running in production '
118 'environment and it can use database that is not '
119 'hosted in localhost. If it is set to False, '
120 'scheduler will fail if database is not in '
121 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700122 action='store_true', default=False)
jadmanski0afbb632008-06-06 21:10:57 +0000123 (options, args) = parser.parse_args()
124 if len(args) != 1:
125 parser.print_usage()
126 return
mbligh36768f02008-02-22 18:28:33 +0000127
Dan Shif6c65bd2014-08-29 16:15:07 -0700128 scheduler_lib.check_production_settings(options)
129
showard5613c662009-06-08 23:30:33 +0000130 scheduler_enabled = global_config.global_config.get_config_value(
131 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
132
133 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800134 logging.error("Scheduler not enabled, set enable_scheduler to true in "
135 "the global_config's SCHEDULER section to enable it. "
136 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000137 sys.exit(1)
138
jadmanski0afbb632008-06-06 21:10:57 +0000139 global RESULTS_DIR
140 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000141
mbligh83c1e9e2009-05-01 23:10:41 +0000142 site_init = utils.import_site_function(__file__,
143 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
144 _site_init_monitor_db_dummy)
145 site_init()
146
showardcca334f2009-03-12 20:38:34 +0000147 # Change the cwd while running to avoid issues incase we were launched from
148 # somewhere odd (such as a random NFS home directory of the person running
149 # sudo to launch us as the appropriate user).
150 os.chdir(RESULTS_DIR)
151
jamesrenc7d387e2010-08-10 21:48:30 +0000152 # This is helpful for debugging why stuff a scheduler launches is
153 # misbehaving.
154 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000155
jadmanski0afbb632008-06-06 21:10:57 +0000156 if options.test:
157 global _autoserv_path
158 _autoserv_path = 'autoserv_dummy'
159 global _testing_mode
160 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000161
jamesrenc44ae992010-02-19 00:12:54 +0000162 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000163 server.start()
164
jadmanski0afbb632008-06-06 21:10:57 +0000165 try:
jamesrenc44ae992010-02-19 00:12:54 +0000166 initialize()
showardc5afc462009-01-13 00:09:39 +0000167 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000168 dispatcher.initialize(recover_hosts=options.recover_hosts)
Shuqian Zhaoce59fe52015-03-18 10:53:58 -0700169 minimum_tick_sec = global_config.global_config.get_config_value(
170 scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
showardc5afc462009-01-13 00:09:39 +0000171
Eric Lia82dc352011-02-23 13:15:52 -0800172 while not _shutdown and not server._shutdown_scheduler:
Shuqian Zhaoce59fe52015-03-18 10:53:58 -0700173 start = time.time()
jadmanski0afbb632008-06-06 21:10:57 +0000174 dispatcher.tick()
Shuqian Zhaoce59fe52015-03-18 10:53:58 -0700175 curr_tick_sec = time.time() - start
176 if (minimum_tick_sec > curr_tick_sec):
177 time.sleep(minimum_tick_sec - curr_tick_sec)
178 else:
179 time.sleep(0.0001)
Prashanth B4ec98672014-05-15 10:44:54 -0700180 except Exception:
showard170873e2009-01-07 00:22:26 +0000181 email_manager.manager.log_stacktrace(
182 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000183
showard170873e2009-01-07 00:22:26 +0000184 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000185 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000186 _drone_manager.shutdown()
Prashanth B0e960282014-05-13 19:38:28 -0700187 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000188
189
Prashanth B4ec98672014-05-15 10:44:54 -0700190def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000191 global _shutdown
192 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000193 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000194
195
jamesrenc44ae992010-02-19 00:12:54 +0000196def initialize():
showardb18134f2009-03-20 20:52:18 +0000197 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
198 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000199
showard8de37132009-08-31 18:33:08 +0000200 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000201 logging.critical("monitor_db already running, aborting!")
202 sys.exit(1)
203 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000204
showardb1e51872008-10-07 11:08:18 +0000205 if _testing_mode:
206 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700207 scheduler_lib.DB_CONFIG_SECTION, 'database',
208 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000209
Dan Shib9144a42014-12-01 16:09:32 -0800210 # If server database is enabled, check if the server has role `scheduler`.
211 # If the server does not have scheduler role, exception will be raised and
212 # scheduler will not continue to run.
213 if server_manager_utils.use_server_db():
214 server_manager_utils.confirm_server_has_role(hostname='localhost',
215 role='scheduler')
216
jadmanski0afbb632008-06-06 21:10:57 +0000217 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700218 global _db_manager
219 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700220 global _db
221 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000222 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700223 signal.signal(signal.SIGINT, handle_signal)
224 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000225
jamesrenc44ae992010-02-19 00:12:54 +0000226 initialize_globals()
227 scheduler_models.initialize()
228
Dan Shib9144a42014-12-01 16:09:32 -0800229 if server_manager_utils.use_server_db():
230 drone_list = server_manager_utils.get_drones()
231 else:
232 drones = global_config.global_config.get_config_value(
233 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
234 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000235 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000236 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000237 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
238
showardb18134f2009-03-20 20:52:18 +0000239 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000240
241
jamesrenc44ae992010-02-19 00:12:54 +0000242def initialize_globals():
243 global _drone_manager
244 _drone_manager = drone_manager.instance()
245
246
showarded2afea2009-07-07 20:54:07 +0000247def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
248 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000249 """
250 @returns The autoserv command line as a list of executable + parameters.
251
252 @param machines - string - A machine or comma separated list of machines
253 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000254 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700255 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
256 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000257 @param queue_entry - A HostQueueEntry object - If supplied and no Job
258 object was supplied, this will be used to lookup the Job object.
259 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700260 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
261 machines, results_directory=drone_manager.WORKING_DIRECTORY,
262 extra_args=extra_args, job=job, queue_entry=queue_entry,
263 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000264
265
Simran Basia858a232012-08-21 11:04:37 -0700266class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800267
268
jadmanski0afbb632008-06-06 21:10:57 +0000269 def __init__(self):
270 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000271 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700272 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000273 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700274 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700275 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Dan Shic458f662015-04-29 12:12:38 -0700276 _db, _drone_manager)
showard170873e2009-01-07 00:22:26 +0000277 self._host_agents = {}
278 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000279 self._tick_count = 0
280 self._last_garbage_stats_time = time.time()
281 self._seconds_between_garbage_stats = 60 * (
282 global_config.global_config.get_config_value(
283 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700284 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700285 self._tick_debug = global_config.global_config.get_config_value(
286 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
287 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700288 self._extra_debugging = global_config.global_config.get_config_value(
289 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
290 default=False)
mbligh36768f02008-02-22 18:28:33 +0000291
Prashanth Bf66d51b2014-05-06 12:42:25 -0700292 # If _inline_host_acquisition is set the scheduler will acquire and
293 # release hosts against jobs inline, with the tick. Otherwise the
294 # scheduler will only focus on jobs that already have hosts, and
295 # will not explicitly unlease a host when a job finishes using it.
296 self._job_query_manager = query_managers.AFEJobQueryManager()
297 self._host_scheduler = (host_scheduler.BaseHostScheduler()
298 if _inline_host_acquisition else
299 host_scheduler.DummyHostScheduler())
300
mbligh36768f02008-02-22 18:28:33 +0000301
showard915958d2009-04-22 21:00:58 +0000302 def initialize(self, recover_hosts=True):
303 self._periodic_cleanup.initialize()
304 self._24hr_upkeep.initialize()
Dan Shi55d58992015-05-05 09:10:02 -0700305 # Execute all actions queued in the cleanup tasks. Scheduler tick will
306 # run a refresh task first. If there is any action in the queue, refresh
307 # will raise an exception.
308 _drone_manager.execute_actions()
showard915958d2009-04-22 21:00:58 +0000309
jadmanski0afbb632008-06-06 21:10:57 +0000310 # always recover processes
311 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000312
jadmanski0afbb632008-06-06 21:10:57 +0000313 if recover_hosts:
314 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000315
316
Simran Basi0ec94dd2012-08-28 09:50:10 -0700317 def _log_tick_msg(self, msg):
318 if self._tick_debug:
319 logging.debug(msg)
320
321
Simran Basidef92872012-09-20 13:34:34 -0700322 def _log_extra_msg(self, msg):
323 if self._extra_debugging:
324 logging.debug(msg)
325
326
jadmanski0afbb632008-06-06 21:10:57 +0000327 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700328 """
329 This is an altered version of tick() where we keep track of when each
330 major step begins so we can try to figure out where we are using most
331 of the tick time.
332 """
Gabe Black1e1c41b2015-02-04 23:55:15 -0800333 timer = autotest_stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700334 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000335 self._garbage_collection()
Prashanth B340fd1e2014-06-22 12:44:10 -0700336 self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
337 _drone_manager.trigger_refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700338 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000339 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700340 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000341 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700342 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000343 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700344 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000345 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700346 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000347 self._schedule_new_jobs()
Prashanth B340fd1e2014-06-22 12:44:10 -0700348 self._log_tick_msg('Calling _drone_manager.sync_refresh().')
349 _drone_manager.sync_refresh()
Dan Shi55d58992015-05-05 09:10:02 -0700350 # _run_cleanup must be called between drone_manager.sync_refresh, and
351 # drone_manager.execute_actions, as sync_refresh will clear the calls
352 # queued in drones. Therefore, any action that calls drone.queue_call
353 # to add calls to the drone._calls, should be after drone refresh is
354 # completed and before drone_manager.execute_actions at the end of the
355 # tick.
356 self._log_tick_msg('Calling _run_cleanup().')
357 self._run_cleanup()
Prashanth B67548092014-07-11 18:46:01 -0700358 self._log_tick_msg('Calling _find_aborting().')
359 self._find_aborting()
360 self._log_tick_msg('Calling _find_aborted_special_tasks().')
361 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700362 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000363 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700364 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000365 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700366 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000367 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700368 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700369 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700370 with timer.get_client('email_manager_send_queued_emails'):
371 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700372 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700373 with timer.get_client('django_db_reset_queries'):
374 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000375 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000376
showard97aed502008-11-04 02:01:24 +0000377
mblighf3294cc2009-04-08 21:17:38 +0000378 def _run_cleanup(self):
379 self._periodic_cleanup.run_cleanup_maybe()
380 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000381
mbligh36768f02008-02-22 18:28:33 +0000382
showardf13a9e22009-12-18 22:54:09 +0000383 def _garbage_collection(self):
384 threshold_time = time.time() - self._seconds_between_garbage_stats
385 if threshold_time < self._last_garbage_stats_time:
386 # Don't generate these reports very often.
387 return
388
389 self._last_garbage_stats_time = time.time()
390 # Force a full level 0 collection (because we can, it doesn't hurt
391 # at this interval).
392 gc.collect()
393 logging.info('Logging garbage collector stats on tick %d.',
394 self._tick_count)
395 gc_stats._log_garbage_collector_stats()
396
397
showard170873e2009-01-07 00:22:26 +0000398 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
399 for object_id in object_ids:
400 agent_dict.setdefault(object_id, set()).add(agent)
401
402
403 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
404 for object_id in object_ids:
405 assert object_id in agent_dict
406 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700407 # If an ID has no more active agent associated, there is no need to
408 # keep it in the dictionary. Otherwise, scheduler will keep an
409 # unnecessarily big dictionary until being restarted.
410 if not agent_dict[object_id]:
411 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000412
413
showardd1195652009-12-08 22:21:02 +0000414 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700415 """
416 Creates and adds an agent to the dispatchers list.
417
418 In creating the agent we also pass on all the queue_entry_ids and
419 host_ids from the special agent task. For every agent we create, we
420 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
421 against the host_ids given to it. So theoritically, a host can have any
422 number of agents associated with it, and each of them can have any
423 special agent task, though in practice we never see > 1 agent/task per
424 host at any time.
425
426 @param agent_task: A SpecialTask for the agent to manage.
427 """
showardd1195652009-12-08 22:21:02 +0000428 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000429 self._agents.append(agent)
430 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000431 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
432 self._register_agent_for_ids(self._queue_entry_agents,
433 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000434
showard170873e2009-01-07 00:22:26 +0000435
436 def get_agents_for_entry(self, queue_entry):
437 """
438 Find agents corresponding to the specified queue_entry.
439 """
showardd3dc1992009-04-22 21:01:40 +0000440 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000441
442
443 def host_has_agent(self, host):
444 """
445 Determine if there is currently an Agent present using this host.
446 """
447 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000448
449
jadmanski0afbb632008-06-06 21:10:57 +0000450 def remove_agent(self, agent):
451 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000452 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
453 agent)
454 self._unregister_agent_for_ids(self._queue_entry_agents,
455 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000456
457
showard8cc058f2009-09-08 16:26:33 +0000458 def _host_has_scheduled_special_task(self, host):
459 return bool(models.SpecialTask.objects.filter(host__id=host.id,
460 is_active=False,
461 is_complete=False))
462
463
jadmanski0afbb632008-06-06 21:10:57 +0000464 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000465 agent_tasks = self._create_recovery_agent_tasks()
466 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000467 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000468 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000469 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000470 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000471 self._reverify_remaining_hosts()
472 # reinitialize drones after killing orphaned processes, since they can
473 # leave around files when they die
474 _drone_manager.execute_actions()
475 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000476
showard170873e2009-01-07 00:22:26 +0000477
showardd1195652009-12-08 22:21:02 +0000478 def _create_recovery_agent_tasks(self):
479 return (self._get_queue_entry_agent_tasks()
480 + self._get_special_task_agent_tasks(is_active=True))
481
482
483 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700484 """
485 Get agent tasks for all hqe in the specified states.
486
487 Loosely this translates to taking a hqe in one of the specified states,
488 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
489 through _get_agent_task_for_queue_entry. Each queue entry can only have
490 one agent task at a time, but there might be multiple queue entries in
491 the group.
492
493 @return: A list of AgentTasks.
494 """
showardd1195652009-12-08 22:21:02 +0000495 # host queue entry statuses handled directly by AgentTasks (Verifying is
496 # handled through SpecialTasks, so is not listed here)
497 statuses = (models.HostQueueEntry.Status.STARTING,
498 models.HostQueueEntry.Status.RUNNING,
499 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000500 models.HostQueueEntry.Status.PARSING,
501 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000502 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000503 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000504 where='status IN (%s)' % status_list)
Gabe Black1e1c41b2015-02-04 23:55:15 -0800505 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Alex Miller47cd2472013-11-25 15:20:04 -0800506 'running', len(queue_entries))
showardd1195652009-12-08 22:21:02 +0000507
508 agent_tasks = []
509 used_queue_entries = set()
510 for entry in queue_entries:
511 if self.get_agents_for_entry(entry):
512 # already being handled
513 continue
514 if entry in used_queue_entries:
515 # already picked up by a synchronous job
516 continue
517 agent_task = self._get_agent_task_for_queue_entry(entry)
518 agent_tasks.append(agent_task)
519 used_queue_entries.update(agent_task.queue_entries)
520 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000521
522
showardd1195652009-12-08 22:21:02 +0000523 def _get_special_task_agent_tasks(self, is_active=False):
524 special_tasks = models.SpecialTask.objects.filter(
525 is_active=is_active, is_complete=False)
526 return [self._get_agent_task_for_special_task(task)
527 for task in special_tasks]
528
529
530 def _get_agent_task_for_queue_entry(self, queue_entry):
531 """
beeps8bb1f7d2013-08-05 01:30:09 -0700532 Construct an AgentTask instance for the given active HostQueueEntry.
533
showardd1195652009-12-08 22:21:02 +0000534 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700535 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000536 """
537 task_entries = queue_entry.job.get_group_entries(queue_entry)
538 self._check_for_duplicate_host_entries(task_entries)
539
540 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
541 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000542 if queue_entry.is_hostless():
543 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000544 return QueueTask(queue_entries=task_entries)
545 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700546 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000547 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700548 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000549 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700550 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000551
Prashanth B0e960282014-05-13 19:38:28 -0700552 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800553 '_get_agent_task_for_queue_entry got entry with '
554 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000555
556
557 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000558 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
559 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000560 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000561 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000562 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000563 if using_host:
showardd1195652009-12-08 22:21:02 +0000564 self._assert_host_has_no_agent(task_entry)
565
566
567 def _assert_host_has_no_agent(self, entry):
568 """
569 @param entry: a HostQueueEntry or a SpecialTask
570 """
571 if self.host_has_agent(entry.host):
572 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700573 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000574 'While scheduling %s, host %s already has a host agent %s'
575 % (entry, entry.host, agent.task))
576
577
578 def _get_agent_task_for_special_task(self, special_task):
579 """
580 Construct an AgentTask class to run the given SpecialTask and add it
581 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700582
MK Ryu35d661e2014-09-25 17:44:10 -0700583 A special task is created through schedule_special_tasks, but only if
beeps8bb1f7d2013-08-05 01:30:09 -0700584 the host doesn't already have an agent. This happens through
585 add_agent_task. All special agent tasks are given a host on creation,
586 and a Null hqe. To create a SpecialAgentTask object, you need a
587 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
588 object contains a hqe it's passed on to the special agent task, which
589 creates a HostQueueEntry and saves it as it's queue_entry.
590
showardd1195652009-12-08 22:21:02 +0000591 @param special_task: a models.SpecialTask instance
592 @returns an AgentTask to run this SpecialTask
593 """
594 self._assert_host_has_no_agent(special_task)
595
beeps5e2bb4a2013-10-28 11:26:45 -0700596 special_agent_task_classes = (prejob_task.CleanupTask,
597 prejob_task.VerifyTask,
598 prejob_task.RepairTask,
599 prejob_task.ResetTask,
600 prejob_task.ProvisionTask)
601
showardd1195652009-12-08 22:21:02 +0000602 for agent_task_class in special_agent_task_classes:
603 if agent_task_class.TASK_TYPE == special_task.task:
604 return agent_task_class(task=special_task)
605
Prashanth B0e960282014-05-13 19:38:28 -0700606 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800607 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000608
609
610 def _register_pidfiles(self, agent_tasks):
611 for agent_task in agent_tasks:
612 agent_task.register_necessary_pidfiles()
613
614
615 def _recover_tasks(self, agent_tasks):
616 orphans = _drone_manager.get_orphaned_autoserv_processes()
617
618 for agent_task in agent_tasks:
619 agent_task.recover()
620 if agent_task.monitor and agent_task.monitor.has_process():
621 orphans.discard(agent_task.monitor.get_process())
622 self.add_agent_task(agent_task)
623
624 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000625
626
showard8cc058f2009-09-08 16:26:33 +0000627 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000628 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
629 % status):
showard0db3d432009-10-12 20:29:15 +0000630 if entry.status == status and not self.get_agents_for_entry(entry):
631 # The status can change during iteration, e.g., if job.run()
632 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000633 yield entry
634
635
showard6878e8b2009-07-20 22:37:45 +0000636 def _check_for_remaining_orphan_processes(self, orphans):
637 if not orphans:
638 return
639 subject = 'Unrecovered orphan autoserv processes remain'
640 message = '\n'.join(str(process) for process in orphans)
641 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000642
643 die_on_orphans = global_config.global_config.get_config_value(
644 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
645
646 if die_on_orphans:
647 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000648
showard170873e2009-01-07 00:22:26 +0000649
showard8cc058f2009-09-08 16:26:33 +0000650 def _recover_pending_entries(self):
651 for entry in self._get_unassigned_entries(
652 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000653 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000654 entry.on_pending()
655
656
showardb8900452009-10-12 20:31:01 +0000657 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000658 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000659 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
660 unrecovered_hqes = []
661 for queue_entry in queue_entries:
662 special_tasks = models.SpecialTask.objects.filter(
663 task__in=(models.SpecialTask.Task.CLEANUP,
664 models.SpecialTask.Task.VERIFY),
665 queue_entry__id=queue_entry.id,
666 is_complete=False)
667 if special_tasks.count() == 0:
668 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000669
showardb8900452009-10-12 20:31:01 +0000670 if unrecovered_hqes:
671 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700672 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000673 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000674 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000675
676
showard65db3932009-10-28 19:54:35 +0000677 def _schedule_special_tasks(self):
678 """
679 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700680
681 Special tasks include PreJobTasks like verify, reset and cleanup.
682 They are created through _schedule_new_jobs and associated with a hqe
683 This method translates SpecialTasks to the appropriate AgentTask and
684 adds them to the dispatchers agents list, so _handle_agents can execute
685 them.
showard65db3932009-10-28 19:54:35 +0000686 """
Prashanth B4ec98672014-05-15 10:44:54 -0700687 # When the host scheduler is responsible for acquisition we only want
688 # to run tasks with leased hosts. All hqe tasks will already have
689 # leased hosts, and we don't want to run frontend tasks till the host
690 # scheduler has vetted the assignment. Note that this doesn't include
691 # frontend tasks with hosts leased by other active hqes.
692 for task in self._job_query_manager.get_prioritized_special_tasks(
693 only_tasks_with_leased_hosts=not _inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000694 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000695 continue
showardd1195652009-12-08 22:21:02 +0000696 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000697
698
showard170873e2009-01-07 00:22:26 +0000699 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000700 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000701 # should never happen
showarded2afea2009-07-07 20:54:07 +0000702 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000703 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000704 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700705 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000706 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000707
708
jadmanski0afbb632008-06-06 21:10:57 +0000709 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000710 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700711 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000712 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000713 if self.host_has_agent(host):
714 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000715 continue
showard8cc058f2009-09-08 16:26:33 +0000716 if self._host_has_scheduled_special_task(host):
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700717 # host will have a special task scheduled on the next tick
showard8cc058f2009-09-08 16:26:33 +0000718 continue
showard170873e2009-01-07 00:22:26 +0000719 if print_message:
showardb18134f2009-03-20 20:52:18 +0000720 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000721 models.SpecialTask.objects.create(
722 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000723 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000724
725
jadmanski0afbb632008-06-06 21:10:57 +0000726 def _recover_hosts(self):
727 # recover "Repair Failed" hosts
728 message = 'Reverifying dead host %s'
729 self._reverify_hosts_where("status = 'Repair Failed'",
730 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000731
732
showard89f84db2009-03-12 20:39:13 +0000733 def _refresh_pending_queue_entries(self):
734 """
735 Lookup the pending HostQueueEntries and call our HostScheduler
736 refresh() method given that list. Return the list.
737
738 @returns A list of pending HostQueueEntries sorted in priority order.
739 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700740 queue_entries = self._job_query_manager.get_pending_queue_entries(
741 only_hostless=not _inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000742 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000743 return []
showard89f84db2009-03-12 20:39:13 +0000744 return queue_entries
745
746
showarda9545c02009-12-18 22:44:26 +0000747 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800748 """Schedule a hostless (suite) job.
749
750 @param queue_entry: The queue_entry representing the hostless job.
751 """
showarda9545c02009-12-18 22:44:26 +0000752 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700753
754 # Need to set execution_subdir before setting the status:
755 # After a restart of the scheduler, agents will be restored for HQEs in
756 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
757 # execution_subdir is needed. Therefore it must be set before entering
758 # one of these states.
759 # Otherwise, if the scheduler was interrupted between setting the status
760 # and the execution_subdir, upon it's restart restoring agents would
761 # fail.
762 # Is there a way to get a status in one of these states without going
763 # through this code? Following cases are possible:
764 # - If it's aborted before being started:
765 # active bit will be 0, so there's nothing to parse, it will just be
766 # set to completed by _find_aborting. Critical statuses are skipped.
767 # - If it's aborted or it fails after being started:
768 # It was started, so this code was executed.
769 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000770 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000771
772
beepscc9fc702013-12-02 12:45:38 -0800773 def _schedule_host_job(self, host, queue_entry):
774 """Schedules a job on the given host.
775
776 1. Assign the host to the hqe, if it isn't already assigned.
777 2. Create a SpecialAgentTask for the hqe.
778 3. Activate the hqe.
779
780 @param queue_entry: The job to schedule.
781 @param host: The host to schedule the job on.
782 """
783 if self.host_has_agent(host):
784 host_agent_task = list(self._host_agents.get(host.id))[0].task
785 subject = 'Host with agents assigned to an HQE'
786 message = ('HQE: %s assigned host %s, but the host has '
787 'agent: %s for queue_entry %s. The HQE '
beepsf7d35162014-02-13 16:42:13 -0800788 'will have to try and acquire a host next tick ' %
beepscc9fc702013-12-02 12:45:38 -0800789 (queue_entry, host.hostname, host_agent_task,
790 host_agent_task.queue_entry))
791 email_manager.manager.enqueue_notify_email(subject, message)
beepscc9fc702013-12-02 12:45:38 -0800792 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700793 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800794
795
showard89f84db2009-03-12 20:39:13 +0000796 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700797 """
798 Find any new HQEs and call schedule_pre_job_tasks for it.
799
800 This involves setting the status of the HQE and creating a row in the
801 db corresponding the the special task, through
802 scheduler_models._queue_special_task. The new db row is then added as
803 an agent to the dispatcher through _schedule_special_tasks and
804 scheduled for execution on the drone through _handle_agents.
805 """
showard89f84db2009-03-12 20:39:13 +0000806 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000807
beepscc9fc702013-12-02 12:45:38 -0800808 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700809 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700810 new_jobs_with_hosts = 0
811 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800812 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700813 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000814
beepscc9fc702013-12-02 12:45:38 -0800815 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000816 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000817 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700818 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000819 else:
beepscc9fc702013-12-02 12:45:38 -0800820 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700821 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700822
Gabe Black1e1c41b2015-02-04 23:55:15 -0800823 autotest_stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800824 if not host_jobs:
825 return
Prashanth Bf66d51b2014-05-06 12:42:25 -0700826 if not _inline_host_acquisition:
827 message = ('Found %s jobs that need hosts though '
828 '_inline_host_acquisition=%s. Will acquire hosts.' %
829 ([str(job) for job in host_jobs],
830 _inline_host_acquisition))
831 email_manager.manager.enqueue_notify_email(
832 'Processing unexpected host acquisition requests', message)
833 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
834 for host_assignment in jobs_with_hosts:
835 self._schedule_host_job(host_assignment.host, host_assignment.job)
836 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800837
Gabe Black1e1c41b2015-02-04 23:55:15 -0800838 autotest_stats.Gauge(key).send('new_jobs_with_hosts',
839 new_jobs_with_hosts)
840 autotest_stats.Gauge(key).send('new_jobs_without_hosts',
841 new_jobs_need_hosts -
842 new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000843
844
showard8cc058f2009-09-08 16:26:33 +0000845 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700846 """
847 Adds agents to the dispatcher.
848
849 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
850 QueueTask for example, will have a job with a control file, and
851 the agent will have methods that poll, abort and check if the queue
852 task is finished. The dispatcher runs the agent_task, as well as
853 other agents in it's _agents member, through _handle_agents, by
854 calling the Agents tick().
855
856 This method creates an agent for each HQE in one of (starting, running,
857 gathering, parsing, archiving) states, and adds it to the dispatcher so
858 it is handled by _handle_agents.
859 """
showardd1195652009-12-08 22:21:02 +0000860 for agent_task in self._get_queue_entry_agent_tasks():
861 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000862
863
864 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000865 for entry in scheduler_models.HostQueueEntry.fetch(
866 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000867 task = entry.job.schedule_delayed_callback_task(entry)
868 if task:
showardd1195652009-12-08 22:21:02 +0000869 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000870
871
jadmanski0afbb632008-06-06 21:10:57 +0000872 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700873 """
874 Looks through the afe_host_queue_entries for an aborted entry.
875
876 The aborted bit is set on an HQE in many ways, the most common
877 being when a user requests an abort through the frontend, which
878 results in an rpc from the afe to abort_host_queue_entries.
879 """
jamesrene7c65cb2010-06-08 20:38:10 +0000880 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000881 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700882 where='aborted=1 and complete=0'):
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800883
884 # If the job is running on a shard, let the shard handle aborting
885 # it and sync back the right status.
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800886 if entry.job.shard_id is not None and not server_utils.is_shard():
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800887 logging.info('Waiting for shard %s to abort hqe %s',
888 entry.job.shard_id, entry)
889 continue
890
showardf4a2e502009-07-28 20:06:39 +0000891 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800892
893 # The task would have started off with both is_complete and
894 # is_active = False. Aborted tasks are neither active nor complete.
895 # For all currently active tasks this will happen through the agent,
896 # but we need to manually update the special tasks that haven't
897 # started yet, because they don't have agents.
898 models.SpecialTask.objects.filter(is_active=False,
899 queue_entry_id=entry.id).update(is_complete=True)
900
showardd3dc1992009-04-22 21:01:40 +0000901 for agent in self.get_agents_for_entry(entry):
902 agent.abort()
903 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000904 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700905 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000906 for job in jobs_to_stop:
907 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000908
909
beeps8bb1f7d2013-08-05 01:30:09 -0700910 def _find_aborted_special_tasks(self):
911 """
912 Find SpecialTasks that have been marked for abortion.
913
914 Poll the database looking for SpecialTasks that are active
915 and have been marked for abortion, then abort them.
916 """
917
918 # The completed and active bits are very important when it comes
919 # to scheduler correctness. The active bit is set through the prolog
920 # of a special task, and reset through the cleanup method of the
921 # SpecialAgentTask. The cleanup is called both through the abort and
922 # epilog. The complete bit is set in several places, and in general
923 # a hanging job will have is_active=1 is_complete=0, while a special
924 # task which completed will have is_active=0 is_complete=1. To check
925 # aborts we directly check active because the complete bit is set in
926 # several places, including the epilog of agent tasks.
927 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
928 is_aborted=True)
929 for task in aborted_tasks:
930 # There are 2 ways to get the agent associated with a task,
931 # through the host and through the hqe. A special task
932 # always needs a host, but doesn't always need a hqe.
933 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700934 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000935
beeps8bb1f7d2013-08-05 01:30:09 -0700936 # The epilog preforms critical actions such as
937 # queueing the next SpecialTask, requeuing the
938 # hqe etc, however it doesn't actually kill the
939 # monitor process and set the 'done' bit. Epilogs
940 # assume that the job failed, and that the monitor
941 # process has already written an exit code. The
942 # done bit is a necessary condition for
943 # _handle_agents to schedule any more special
944 # tasks against the host, and it must be set
945 # in addition to is_active, is_complete and success.
946 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000947 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700948
949
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700950 def _can_start_agent(self, agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000951 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000952 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000953 return True
954 # don't allow any nonzero-process agents to run after we've reached a
955 # limit (this avoids starvation of many-process agents)
956 if have_reached_limit:
957 return False
958 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000959 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000960 agent.task.owner_username,
961 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000962 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000963 return False
showard4c5374f2008-09-04 17:02:56 +0000964 return True
965
966
jadmanski0afbb632008-06-06 21:10:57 +0000967 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700968 """
969 Handles agents of the dispatcher.
970
971 Appropriate Agents are added to the dispatcher through
972 _schedule_running_host_queue_entries. These agents each
973 have a task. This method runs the agents task through
974 agent.tick() leading to:
975 agent.start
976 prolog -> AgentTasks prolog
977 For each queue entry:
978 sets host status/status to Running
979 set started_on in afe_host_queue_entries
980 run -> AgentTasks run
981 Creates PidfileRunMonitor
982 Queues the autoserv command line for this AgentTask
983 via the drone manager. These commands are executed
984 through the drone managers execute actions.
985 poll -> AgentTasks/BaseAgentTask poll
986 checks the monitors exit_code.
987 Executes epilog if task is finished.
988 Executes AgentTasks _finish_task
989 finish_task is usually responsible for setting the status
990 of the HQE/host, and updating it's active and complete fileds.
991
992 agent.is_done
993 Removed the agent from the dispatchers _agents queue.
994 Is_done checks the finished bit on the agent, that is
995 set based on the Agents task. During the agents poll
996 we check to see if the monitor process has exited in
997 it's finish method, and set the success member of the
998 task based on this exit code.
999 """
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001000 num_started_this_tick = 0
1001 num_finished_this_tick = 0
showard4c5374f2008-09-04 17:02:56 +00001002 have_reached_limit = False
1003 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -07001004 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +00001005 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001006 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1007 'queue_entry ids:%s' % (agent.host_ids,
1008 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001009 if not agent.started:
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001010 if not self._can_start_agent(agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001011 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001012 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001013 continue
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001014 num_started_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001015 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001016 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001017 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001018 if agent.is_done():
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001019 num_finished_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001020 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001021 self.remove_agent(agent)
Gabe Black1e1c41b2015-02-04 23:55:15 -08001022 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001023 'agents_started', num_started_this_tick)
Gabe Black1e1c41b2015-02-04 23:55:15 -08001024 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001025 'agents_finished', num_finished_this_tick)
1026 logging.info('%d running processes. %d added this tick.',
Simran Basi3f6717d2012-09-13 15:21:22 -07001027 _drone_manager.total_running_processes(),
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001028 num_started_this_tick)
mbligh36768f02008-02-22 18:28:33 +00001029
1030
showard29f7cd22009-04-29 21:16:24 +00001031 def _process_recurring_runs(self):
1032 recurring_runs = models.RecurringRun.objects.filter(
1033 start_date__lte=datetime.datetime.now())
1034 for rrun in recurring_runs:
1035 # Create job from template
1036 job = rrun.job
1037 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001038 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001039
1040 host_objects = info['hosts']
1041 one_time_hosts = info['one_time_hosts']
1042 metahost_objects = info['meta_hosts']
1043 dependencies = info['dependencies']
1044 atomic_group = info['atomic_group']
1045
1046 for host in one_time_hosts or []:
1047 this_host = models.Host.create_one_time_host(host.hostname)
1048 host_objects.append(this_host)
1049
1050 try:
1051 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001052 options=options,
showard29f7cd22009-04-29 21:16:24 +00001053 host_objects=host_objects,
1054 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001055 atomic_group=atomic_group)
1056
1057 except Exception, ex:
1058 logging.exception(ex)
1059 #TODO send email
1060
1061 if rrun.loop_count == 1:
1062 rrun.delete()
1063 else:
1064 if rrun.loop_count != 0: # if not infinite loop
1065 # calculate new start_date
1066 difference = datetime.timedelta(seconds=rrun.loop_period)
1067 rrun.start_date = rrun.start_date + difference
1068 rrun.loop_count -= 1
1069 rrun.save()
1070
1071
Simran Basia858a232012-08-21 11:04:37 -07001072SiteDispatcher = utils.import_site_class(
1073 __file__, 'autotest_lib.scheduler.site_monitor_db',
1074 'SiteDispatcher', BaseDispatcher)
1075
1076class Dispatcher(SiteDispatcher):
1077 pass
1078
1079
mbligh36768f02008-02-22 18:28:33 +00001080class Agent(object):
showard77182562009-06-10 00:16:05 +00001081 """
Alex Miller47715eb2013-07-24 03:34:01 -07001082 An agent for use by the Dispatcher class to perform a task. An agent wraps
1083 around an AgentTask mainly to associate the AgentTask with the queue_entry
1084 and host ids.
showard77182562009-06-10 00:16:05 +00001085
1086 The following methods are required on all task objects:
1087 poll() - Called periodically to let the task check its status and
1088 update its internal state. If the task succeeded.
1089 is_done() - Returns True if the task is finished.
1090 abort() - Called when an abort has been requested. The task must
1091 set its aborted attribute to True if it actually aborted.
1092
1093 The following attributes are required on all task objects:
1094 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001095 success - bool, True if this task succeeded.
1096 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1097 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001098 """
1099
1100
showard418785b2009-11-23 20:19:59 +00001101 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001102 """
Alex Miller47715eb2013-07-24 03:34:01 -07001103 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001104 """
showard8cc058f2009-09-08 16:26:33 +00001105 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001106
showard77182562009-06-10 00:16:05 +00001107 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001108 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001109
showard8cc058f2009-09-08 16:26:33 +00001110 self.queue_entry_ids = task.queue_entry_ids
1111 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001112
showard8cc058f2009-09-08 16:26:33 +00001113 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001114 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001115
1116
jadmanski0afbb632008-06-06 21:10:57 +00001117 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001118 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001119 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001120 self.task.poll()
1121 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001122 self.finished = True
showardec113162008-05-08 00:52:49 +00001123
1124
jadmanski0afbb632008-06-06 21:10:57 +00001125 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001126 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001127
1128
showardd3dc1992009-04-22 21:01:40 +00001129 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001130 if self.task:
1131 self.task.abort()
1132 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001133 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001134 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001135
showardd3dc1992009-04-22 21:01:40 +00001136
beeps5e2bb4a2013-10-28 11:26:45 -07001137class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001138 """
1139 Common functionality for QueueTask and HostlessQueueTask
1140 """
1141 def __init__(self, queue_entries):
1142 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001143 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001144 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001145
1146
showard73ec0442009-02-07 02:05:20 +00001147 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001148 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001149
1150
jamesrenc44ae992010-02-19 00:12:54 +00001151 def _write_control_file(self, execution_path):
1152 control_path = _drone_manager.attach_file_to_execution(
1153 execution_path, self.job.control_file)
1154 return control_path
1155
1156
Aviv Keshet308e7362013-05-21 14:43:16 -07001157 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001158 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001159 execution_path = self.queue_entries[0].execution_path()
1160 control_path = self._write_control_file(execution_path)
1161 hostnames = ','.join(entry.host.hostname
1162 for entry in self.queue_entries
1163 if not entry.is_hostless())
1164
1165 execution_tag = self.queue_entries[0].execution_tag()
1166 params = _autoserv_command_line(
1167 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001168 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001169 _drone_manager.absolute_path(control_path)],
1170 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001171 if self.job.is_image_update_job():
1172 params += ['--image', self.job.update_image_path]
1173
jamesrenc44ae992010-02-19 00:12:54 +00001174 return params
showardd1195652009-12-08 22:21:02 +00001175
1176
1177 @property
1178 def num_processes(self):
1179 return len(self.queue_entries)
1180
1181
1182 @property
1183 def owner_username(self):
1184 return self.job.owner
1185
1186
1187 def _working_directory(self):
1188 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001189
1190
jadmanski0afbb632008-06-06 21:10:57 +00001191 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001192 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001193 keyval_dict = self.job.keyval_dict()
1194 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001195 group_name = self.queue_entries[0].get_group_name()
1196 if group_name:
1197 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001198 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001199 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001200 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001201 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001202
1203
showard35162b02009-03-03 02:17:30 +00001204 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001205 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001206 _drone_manager.write_lines_to_file(error_file_path,
1207 [_LOST_PROCESS_ERROR])
1208
1209
showardd3dc1992009-04-22 21:01:40 +00001210 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001211 if not self.monitor:
1212 return
1213
showardd9205182009-04-27 20:09:55 +00001214 self._write_job_finished()
1215
showard35162b02009-03-03 02:17:30 +00001216 if self.monitor.lost_process:
1217 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001218
jadmanskif7fa2cc2008-10-01 14:13:23 +00001219
showardcbd74612008-11-19 21:42:02 +00001220 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001221 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001222 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001223 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001224 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001225
1226
jadmanskif7fa2cc2008-10-01 14:13:23 +00001227 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001228 if not self.monitor or not self.monitor.has_process():
1229 return
1230
jadmanskif7fa2cc2008-10-01 14:13:23 +00001231 # build up sets of all the aborted_by and aborted_on values
1232 aborted_by, aborted_on = set(), set()
1233 for queue_entry in self.queue_entries:
1234 if queue_entry.aborted_by:
1235 aborted_by.add(queue_entry.aborted_by)
1236 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1237 aborted_on.add(t)
1238
1239 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001240 # TODO(showard): this conditional is now obsolete, we just need to leave
1241 # it in temporarily for backwards compatibility over upgrades. delete
1242 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001243 assert len(aborted_by) <= 1
1244 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001245 aborted_by_value = aborted_by.pop()
1246 aborted_on_value = max(aborted_on)
1247 else:
1248 aborted_by_value = 'autotest_system'
1249 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001250
showarda0382352009-02-11 23:36:43 +00001251 self._write_keyval_after_job("aborted_by", aborted_by_value)
1252 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001253
showardcbd74612008-11-19 21:42:02 +00001254 aborted_on_string = str(datetime.datetime.fromtimestamp(
1255 aborted_on_value))
1256 self._write_status_comment('Job aborted by %s on %s' %
1257 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001258
1259
jadmanski0afbb632008-06-06 21:10:57 +00001260 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001261 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001262 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001263 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001264
1265
jadmanski0afbb632008-06-06 21:10:57 +00001266 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001267 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001268 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001269
1270
1271class QueueTask(AbstractQueueTask):
1272 def __init__(self, queue_entries):
1273 super(QueueTask, self).__init__(queue_entries)
1274 self._set_ids(queue_entries=queue_entries)
1275
1276
1277 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001278 self._check_queue_entry_statuses(
1279 self.queue_entries,
1280 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1281 models.HostQueueEntry.Status.RUNNING),
1282 allowed_host_statuses=(models.Host.Status.PENDING,
1283 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001284
1285 super(QueueTask, self).prolog()
1286
1287 for queue_entry in self.queue_entries:
1288 self._write_host_keyvals(queue_entry.host)
1289 queue_entry.host.set_status(models.Host.Status.RUNNING)
1290 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001291
1292
1293 def _finish_task(self):
1294 super(QueueTask, self)._finish_task()
1295
1296 for queue_entry in self.queue_entries:
1297 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001298 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001299
1300
Alex Miller9f01d5d2013-08-08 02:26:01 -07001301 def _command_line(self):
1302 invocation = super(QueueTask, self)._command_line()
Dan Shiec1d47d2015-02-13 11:38:13 -08001303 # Check if server-side packaging is needed.
1304 if (_enable_ssp_container and
1305 self.job.control_type == control_data.CONTROL_TYPE.SERVER and
1306 self.job.require_ssp != False):
1307 invocation += ['--require-ssp']
Alex Miller9f01d5d2013-08-08 02:26:01 -07001308 return invocation + ['--verify_job_repo_url']
1309
1310
Dan Shi1a189052013-10-28 14:41:35 -07001311class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001312 def __init__(self, queue_entry):
1313 super(HostlessQueueTask, self).__init__([queue_entry])
1314 self.queue_entry_ids = [queue_entry.id]
1315
1316
1317 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001318 super(HostlessQueueTask, self).prolog()
1319
1320
mbligh4608b002010-01-05 18:22:35 +00001321 def _finish_task(self):
1322 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001323
1324 # When a job is added to database, its initial status is always
1325 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1326 # status, check if any of them can be started. If scheduler hits some
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001327 # limit, e.g., max_hostless_jobs_per_drone, scheduler will
1328 # leave these jobs in Starting status. Otherwise, the jobs'
1329 # status will be changed to Running, and an autoserv process
1330 # will be started in drone for each of these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001331 # If the entry is still in status Starting, the process has not started
1332 # yet. Therefore, there is no need to parse and collect log. Without
1333 # this check, exception will be raised by scheduler as execution_subdir
1334 # for this queue entry does not have a value yet.
1335 hqe = self.queue_entries[0]
1336 if hqe.status != models.HostQueueEntry.Status.STARTING:
1337 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001338
1339
mbligh36768f02008-02-22 18:28:33 +00001340if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001341 main()