blob: ea57bcf0c79a2461ca218192939cb99d9771b9ac [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
Aviv Keshet225bdfe2013-03-05 10:10:08 -08008import datetime, optparse, os, signal
beeps5e2bb4a2013-10-28 11:26:45 -07009import sys, time
Aviv Keshet225bdfe2013-03-05 10:10:08 -080010import logging, gc
showard402934a2009-12-21 22:20:47 +000011
Alex Miller05d7b4c2013-03-04 07:49:38 -080012import common
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000014
15import django.db
16
Prashanth B0e960282014-05-13 19:38:28 -070017from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070018from autotest_lib.client.common_lib import utils
Michael Liangda8c60a2014-06-03 13:24:51 -070019from autotest_lib.client.common_lib.cros.graphite import stats
Prashanth B0e960282014-05-13 19:38:28 -070020from autotest_lib.frontend.afe import models, rpc_utils
beeps5e2bb4a2013-10-28 11:26:45 -070021from autotest_lib.scheduler import agent_task, drone_manager, drones
22from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
23from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070024from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070025from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070026from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000027from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080028from autotest_lib.scheduler import status_server, scheduler_config
Prashanth Bf66d51b2014-05-06 12:42:25 -070029from autotest_lib.scheduler import scheduler_lib
Aviv Keshet308e7362013-05-21 14:43:16 -070030from autotest_lib.server import autoserv_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080031
showard549afad2009-08-20 23:33:36 +000032BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
33PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000034
mbligh36768f02008-02-22 18:28:33 +000035RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000036AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
37
38if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000039 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000040AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
41AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
42
43if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000044 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000045
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
Prashanth B0e960282014-05-13 19:38:28 -070053_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070056
57# These 2 globals are replaced for testing
58_autoserv_directory = autoserv_utils.autoserv_directory
59_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000060_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000061_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070062_inline_host_acquisition = global_config.global_config.get_config_value(
63 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
64 default=True)
65
mbligh36768f02008-02-22 18:28:33 +000066
Eric Lie0493a42010-11-15 13:05:43 -080067def _parser_path_default(install_dir):
68 return os.path.join(install_dir, 'tko', 'parse')
69_parser_path_func = utils.import_site_function(
70 __file__, 'autotest_lib.scheduler.site_monitor_db',
71 'parser_path', _parser_path_default)
72_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
73
mbligh36768f02008-02-22 18:28:33 +000074
mbligh83c1e9e2009-05-01 23:10:41 +000075def _site_init_monitor_db_dummy():
76 return {}
77
78
jamesren76fcf192010-04-21 20:39:50 +000079def _verify_default_drone_set_exists():
80 if (models.DroneSet.drone_sets_enabled() and
81 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070082 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080083 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000084
85
86def _sanity_check():
87 """Make sure the configs are consistent before starting the scheduler"""
88 _verify_default_drone_set_exists()
89
90
mbligh36768f02008-02-22 18:28:33 +000091def main():
showard27f33872009-04-07 18:20:53 +000092 try:
showard549afad2009-08-20 23:33:36 +000093 try:
94 main_without_exception_handling()
95 except SystemExit:
96 raise
97 except:
98 logging.exception('Exception escaping in monitor_db')
99 raise
100 finally:
101 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000102
103
104def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700105 scheduler_lib.setup_logging(
106 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
107 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000108 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000109 parser = optparse.OptionParser(usage)
110 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
111 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000112 parser.add_option('--test', help='Indicate that scheduler is under ' +
113 'test and should use dummy autoserv and no parsing',
114 action='store_true')
115 (options, args) = parser.parse_args()
116 if len(args) != 1:
117 parser.print_usage()
118 return
mbligh36768f02008-02-22 18:28:33 +0000119
showard5613c662009-06-08 23:30:33 +0000120 scheduler_enabled = global_config.global_config.get_config_value(
121 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
122
123 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800124 logging.error("Scheduler not enabled, set enable_scheduler to true in "
125 "the global_config's SCHEDULER section to enable it. "
126 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000127 sys.exit(1)
128
jadmanski0afbb632008-06-06 21:10:57 +0000129 global RESULTS_DIR
130 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000131
mbligh83c1e9e2009-05-01 23:10:41 +0000132 site_init = utils.import_site_function(__file__,
133 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
134 _site_init_monitor_db_dummy)
135 site_init()
136
showardcca334f2009-03-12 20:38:34 +0000137 # Change the cwd while running to avoid issues incase we were launched from
138 # somewhere odd (such as a random NFS home directory of the person running
139 # sudo to launch us as the appropriate user).
140 os.chdir(RESULTS_DIR)
141
jamesrenc7d387e2010-08-10 21:48:30 +0000142 # This is helpful for debugging why stuff a scheduler launches is
143 # misbehaving.
144 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000145
jadmanski0afbb632008-06-06 21:10:57 +0000146 if options.test:
147 global _autoserv_path
148 _autoserv_path = 'autoserv_dummy'
149 global _testing_mode
150 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000151
jamesrenc44ae992010-02-19 00:12:54 +0000152 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000153 server.start()
154
jadmanski0afbb632008-06-06 21:10:57 +0000155 try:
jamesrenc44ae992010-02-19 00:12:54 +0000156 initialize()
showardc5afc462009-01-13 00:09:39 +0000157 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000158 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000159
Eric Lia82dc352011-02-23 13:15:52 -0800160 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000161 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000162 time.sleep(scheduler_config.config.tick_pause_sec)
Prashanth B4ec98672014-05-15 10:44:54 -0700163 except Exception:
showard170873e2009-01-07 00:22:26 +0000164 email_manager.manager.log_stacktrace(
165 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000166
showard170873e2009-01-07 00:22:26 +0000167 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000168 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000169 _drone_manager.shutdown()
Prashanth B0e960282014-05-13 19:38:28 -0700170 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000171
172
Prashanth B4ec98672014-05-15 10:44:54 -0700173def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000174 global _shutdown
175 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000176 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000177
178
jamesrenc44ae992010-02-19 00:12:54 +0000179def initialize():
showardb18134f2009-03-20 20:52:18 +0000180 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
181 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000182
showard8de37132009-08-31 18:33:08 +0000183 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000184 logging.critical("monitor_db already running, aborting!")
185 sys.exit(1)
186 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000187
showardb1e51872008-10-07 11:08:18 +0000188 if _testing_mode:
189 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700190 scheduler_lib.DB_CONFIG_SECTION, 'database',
191 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000192
jadmanski0afbb632008-06-06 21:10:57 +0000193 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700194 global _db_manager
195 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700196 global _db
197 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000198 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700199 signal.signal(signal.SIGINT, handle_signal)
200 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000201
jamesrenc44ae992010-02-19 00:12:54 +0000202 initialize_globals()
203 scheduler_models.initialize()
204
showardd1ee1dd2009-01-07 21:33:08 +0000205 drones = global_config.global_config.get_config_value(
206 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
207 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000208 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000209 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000210 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
211
showardb18134f2009-03-20 20:52:18 +0000212 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000213
214
jamesrenc44ae992010-02-19 00:12:54 +0000215def initialize_globals():
216 global _drone_manager
217 _drone_manager = drone_manager.instance()
218
219
showarded2afea2009-07-07 20:54:07 +0000220def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
221 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000222 """
223 @returns The autoserv command line as a list of executable + parameters.
224
225 @param machines - string - A machine or comma separated list of machines
226 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000227 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700228 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
229 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000230 @param queue_entry - A HostQueueEntry object - If supplied and no Job
231 object was supplied, this will be used to lookup the Job object.
232 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700233 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
234 machines, results_directory=drone_manager.WORKING_DIRECTORY,
235 extra_args=extra_args, job=job, queue_entry=queue_entry,
236 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000237
238
Simran Basia858a232012-08-21 11:04:37 -0700239class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800240
241
jadmanski0afbb632008-06-06 21:10:57 +0000242 def __init__(self):
243 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000244 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700245 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000246 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700247 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700248 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Jakob Jülich36accc62014-07-23 10:26:55 -0700249 _db)
showard170873e2009-01-07 00:22:26 +0000250 self._host_agents = {}
251 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000252 self._tick_count = 0
253 self._last_garbage_stats_time = time.time()
254 self._seconds_between_garbage_stats = 60 * (
255 global_config.global_config.get_config_value(
256 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700257 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700258 self._tick_debug = global_config.global_config.get_config_value(
259 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
260 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700261 self._extra_debugging = global_config.global_config.get_config_value(
262 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
263 default=False)
mbligh36768f02008-02-22 18:28:33 +0000264
Prashanth Bf66d51b2014-05-06 12:42:25 -0700265 # If _inline_host_acquisition is set the scheduler will acquire and
266 # release hosts against jobs inline, with the tick. Otherwise the
267 # scheduler will only focus on jobs that already have hosts, and
268 # will not explicitly unlease a host when a job finishes using it.
269 self._job_query_manager = query_managers.AFEJobQueryManager()
270 self._host_scheduler = (host_scheduler.BaseHostScheduler()
271 if _inline_host_acquisition else
272 host_scheduler.DummyHostScheduler())
273
mbligh36768f02008-02-22 18:28:33 +0000274
showard915958d2009-04-22 21:00:58 +0000275 def initialize(self, recover_hosts=True):
276 self._periodic_cleanup.initialize()
277 self._24hr_upkeep.initialize()
278
Jakob Juelichb7c842f2014-07-23 15:18:29 -0700279 self._restore_tasks_where_scheduling_was_interrupted()
280
jadmanski0afbb632008-06-06 21:10:57 +0000281 # always recover processes
282 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000283
jadmanski0afbb632008-06-06 21:10:57 +0000284 if recover_hosts:
285 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000286
287
Simran Basi0ec94dd2012-08-28 09:50:10 -0700288 def _log_tick_msg(self, msg):
289 if self._tick_debug:
290 logging.debug(msg)
291
292
Simran Basidef92872012-09-20 13:34:34 -0700293 def _log_extra_msg(self, msg):
294 if self._extra_debugging:
295 logging.debug(msg)
296
297
Jakob Juelichb7c842f2014-07-23 15:18:29 -0700298 def _restore_tasks_where_scheduling_was_interrupted(self):
299 """
300 Restore consistent database state after being interrupted.
301
302 If the scheduler gets interrupted in a disadvantageous moment it might
303 leave host queue entries behind that have a host assigned, but not an
304 execution subdirectory. This function restores a consistent state by
305 rescheduling those entries.
306 """
307 # See crosbug.com/334353
308 SQL_SUSPECT_ENTRIES_WHERE = ('complete!=1 AND execution_subdir="" AND '
309 'status!="Queued";')
310 queue_entries = scheduler_models.HostQueueEntry.fetch(
311 where=SQL_SUSPECT_ENTRIES_WHERE)
312
313 for queue_entry in queue_entries:
314 # Aborting special tasks. Decoupling them from the HQE so the
315 # HostQueueEntry doesn't get rescheduled because of the aborted
316 # and therefore failing SpecialTask.
317 # Aborting all special tasks will lead to their hosts getting
318 # repaired before releasing them into the ready pool.
319 special_tasks = models.SpecialTask.objects.filter(
320 host__id=queue_entry.host_id,
321 is_active=True,
322 is_complete=False).update(is_aborted=True, queue_entry=None)
323 queue_entry.requeue()
324
325
jadmanski0afbb632008-06-06 21:10:57 +0000326 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700327 """
328 This is an altered version of tick() where we keep track of when each
329 major step begins so we can try to figure out where we are using most
330 of the tick time.
331 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700332 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700333 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000334 self._garbage_collection()
Prashanth B340fd1e2014-06-22 12:44:10 -0700335 self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
336 _drone_manager.trigger_refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700337 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000338 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700339 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000340 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700341 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000342 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700343 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000344 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700345 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000346 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700347 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000348 self._schedule_new_jobs()
Prashanth B340fd1e2014-06-22 12:44:10 -0700349 self._log_tick_msg('Calling _drone_manager.sync_refresh().')
350 _drone_manager.sync_refresh()
Prashanth B67548092014-07-11 18:46:01 -0700351 self._log_tick_msg('Calling _find_aborting().')
352 self._find_aborting()
353 self._log_tick_msg('Calling _find_aborted_special_tasks().')
354 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700355 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000356 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700357 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000358 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700359 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000360 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700361 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700362 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700363 with timer.get_client('email_manager_send_queued_emails'):
364 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700365 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700366 with timer.get_client('django_db_reset_queries'):
367 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000368 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000369
showard97aed502008-11-04 02:01:24 +0000370
mblighf3294cc2009-04-08 21:17:38 +0000371 def _run_cleanup(self):
372 self._periodic_cleanup.run_cleanup_maybe()
373 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000374
mbligh36768f02008-02-22 18:28:33 +0000375
showardf13a9e22009-12-18 22:54:09 +0000376 def _garbage_collection(self):
377 threshold_time = time.time() - self._seconds_between_garbage_stats
378 if threshold_time < self._last_garbage_stats_time:
379 # Don't generate these reports very often.
380 return
381
382 self._last_garbage_stats_time = time.time()
383 # Force a full level 0 collection (because we can, it doesn't hurt
384 # at this interval).
385 gc.collect()
386 logging.info('Logging garbage collector stats on tick %d.',
387 self._tick_count)
388 gc_stats._log_garbage_collector_stats()
389
390
showard170873e2009-01-07 00:22:26 +0000391 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
392 for object_id in object_ids:
393 agent_dict.setdefault(object_id, set()).add(agent)
394
395
396 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
397 for object_id in object_ids:
398 assert object_id in agent_dict
399 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700400 # If an ID has no more active agent associated, there is no need to
401 # keep it in the dictionary. Otherwise, scheduler will keep an
402 # unnecessarily big dictionary until being restarted.
403 if not agent_dict[object_id]:
404 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000405
406
showardd1195652009-12-08 22:21:02 +0000407 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700408 """
409 Creates and adds an agent to the dispatchers list.
410
411 In creating the agent we also pass on all the queue_entry_ids and
412 host_ids from the special agent task. For every agent we create, we
413 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
414 against the host_ids given to it. So theoritically, a host can have any
415 number of agents associated with it, and each of them can have any
416 special agent task, though in practice we never see > 1 agent/task per
417 host at any time.
418
419 @param agent_task: A SpecialTask for the agent to manage.
420 """
showardd1195652009-12-08 22:21:02 +0000421 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000422 self._agents.append(agent)
423 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000424 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
425 self._register_agent_for_ids(self._queue_entry_agents,
426 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000427
showard170873e2009-01-07 00:22:26 +0000428
429 def get_agents_for_entry(self, queue_entry):
430 """
431 Find agents corresponding to the specified queue_entry.
432 """
showardd3dc1992009-04-22 21:01:40 +0000433 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000434
435
436 def host_has_agent(self, host):
437 """
438 Determine if there is currently an Agent present using this host.
439 """
440 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000441
442
jadmanski0afbb632008-06-06 21:10:57 +0000443 def remove_agent(self, agent):
444 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000445 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
446 agent)
447 self._unregister_agent_for_ids(self._queue_entry_agents,
448 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000449
450
showard8cc058f2009-09-08 16:26:33 +0000451 def _host_has_scheduled_special_task(self, host):
452 return bool(models.SpecialTask.objects.filter(host__id=host.id,
453 is_active=False,
454 is_complete=False))
455
456
jadmanski0afbb632008-06-06 21:10:57 +0000457 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000458 agent_tasks = self._create_recovery_agent_tasks()
459 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000460 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000461 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000462 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000463 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000464 self._reverify_remaining_hosts()
465 # reinitialize drones after killing orphaned processes, since they can
466 # leave around files when they die
467 _drone_manager.execute_actions()
468 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000469
showard170873e2009-01-07 00:22:26 +0000470
showardd1195652009-12-08 22:21:02 +0000471 def _create_recovery_agent_tasks(self):
472 return (self._get_queue_entry_agent_tasks()
473 + self._get_special_task_agent_tasks(is_active=True))
474
475
476 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700477 """
478 Get agent tasks for all hqe in the specified states.
479
480 Loosely this translates to taking a hqe in one of the specified states,
481 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
482 through _get_agent_task_for_queue_entry. Each queue entry can only have
483 one agent task at a time, but there might be multiple queue entries in
484 the group.
485
486 @return: A list of AgentTasks.
487 """
showardd1195652009-12-08 22:21:02 +0000488 # host queue entry statuses handled directly by AgentTasks (Verifying is
489 # handled through SpecialTasks, so is not listed here)
490 statuses = (models.HostQueueEntry.Status.STARTING,
491 models.HostQueueEntry.Status.RUNNING,
492 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000493 models.HostQueueEntry.Status.PARSING,
494 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000495 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000496 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000497 where='status IN (%s)' % status_list)
Alex Miller47cd2472013-11-25 15:20:04 -0800498 stats.Gauge('scheduler.jobs_per_tick').send(
499 'running', len(queue_entries))
showardd1195652009-12-08 22:21:02 +0000500
501 agent_tasks = []
502 used_queue_entries = set()
503 for entry in queue_entries:
504 if self.get_agents_for_entry(entry):
505 # already being handled
506 continue
507 if entry in used_queue_entries:
508 # already picked up by a synchronous job
509 continue
510 agent_task = self._get_agent_task_for_queue_entry(entry)
511 agent_tasks.append(agent_task)
512 used_queue_entries.update(agent_task.queue_entries)
513 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000514
515
showardd1195652009-12-08 22:21:02 +0000516 def _get_special_task_agent_tasks(self, is_active=False):
517 special_tasks = models.SpecialTask.objects.filter(
518 is_active=is_active, is_complete=False)
519 return [self._get_agent_task_for_special_task(task)
520 for task in special_tasks]
521
522
523 def _get_agent_task_for_queue_entry(self, queue_entry):
524 """
beeps8bb1f7d2013-08-05 01:30:09 -0700525 Construct an AgentTask instance for the given active HostQueueEntry.
526
showardd1195652009-12-08 22:21:02 +0000527 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700528 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000529 """
530 task_entries = queue_entry.job.get_group_entries(queue_entry)
531 self._check_for_duplicate_host_entries(task_entries)
532
533 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
534 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000535 if queue_entry.is_hostless():
536 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000537 return QueueTask(queue_entries=task_entries)
538 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700539 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000540 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700541 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000542 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700543 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000544
Prashanth B0e960282014-05-13 19:38:28 -0700545 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800546 '_get_agent_task_for_queue_entry got entry with '
547 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000548
549
550 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000551 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
552 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000553 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000554 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000555 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000556 if using_host:
showardd1195652009-12-08 22:21:02 +0000557 self._assert_host_has_no_agent(task_entry)
558
559
560 def _assert_host_has_no_agent(self, entry):
561 """
562 @param entry: a HostQueueEntry or a SpecialTask
563 """
564 if self.host_has_agent(entry.host):
565 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700566 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000567 'While scheduling %s, host %s already has a host agent %s'
568 % (entry, entry.host, agent.task))
569
570
571 def _get_agent_task_for_special_task(self, special_task):
572 """
573 Construct an AgentTask class to run the given SpecialTask and add it
574 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700575
576 A special task is create through schedule_special_tasks, but only if
577 the host doesn't already have an agent. This happens through
578 add_agent_task. All special agent tasks are given a host on creation,
579 and a Null hqe. To create a SpecialAgentTask object, you need a
580 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
581 object contains a hqe it's passed on to the special agent task, which
582 creates a HostQueueEntry and saves it as it's queue_entry.
583
showardd1195652009-12-08 22:21:02 +0000584 @param special_task: a models.SpecialTask instance
585 @returns an AgentTask to run this SpecialTask
586 """
587 self._assert_host_has_no_agent(special_task)
588
beeps5e2bb4a2013-10-28 11:26:45 -0700589 special_agent_task_classes = (prejob_task.CleanupTask,
590 prejob_task.VerifyTask,
591 prejob_task.RepairTask,
592 prejob_task.ResetTask,
593 prejob_task.ProvisionTask)
594
showardd1195652009-12-08 22:21:02 +0000595 for agent_task_class in special_agent_task_classes:
596 if agent_task_class.TASK_TYPE == special_task.task:
597 return agent_task_class(task=special_task)
598
Prashanth B0e960282014-05-13 19:38:28 -0700599 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800600 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000601
602
603 def _register_pidfiles(self, agent_tasks):
604 for agent_task in agent_tasks:
605 agent_task.register_necessary_pidfiles()
606
607
608 def _recover_tasks(self, agent_tasks):
609 orphans = _drone_manager.get_orphaned_autoserv_processes()
610
611 for agent_task in agent_tasks:
612 agent_task.recover()
613 if agent_task.monitor and agent_task.monitor.has_process():
614 orphans.discard(agent_task.monitor.get_process())
615 self.add_agent_task(agent_task)
616
617 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000618
619
showard8cc058f2009-09-08 16:26:33 +0000620 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000621 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
622 % status):
showard0db3d432009-10-12 20:29:15 +0000623 if entry.status == status and not self.get_agents_for_entry(entry):
624 # The status can change during iteration, e.g., if job.run()
625 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000626 yield entry
627
628
showard6878e8b2009-07-20 22:37:45 +0000629 def _check_for_remaining_orphan_processes(self, orphans):
630 if not orphans:
631 return
632 subject = 'Unrecovered orphan autoserv processes remain'
633 message = '\n'.join(str(process) for process in orphans)
634 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000635
636 die_on_orphans = global_config.global_config.get_config_value(
637 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
638
639 if die_on_orphans:
640 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000641
showard170873e2009-01-07 00:22:26 +0000642
showard8cc058f2009-09-08 16:26:33 +0000643 def _recover_pending_entries(self):
644 for entry in self._get_unassigned_entries(
645 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000646 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000647 entry.on_pending()
648
649
showardb8900452009-10-12 20:31:01 +0000650 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000651 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000652 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
653 unrecovered_hqes = []
654 for queue_entry in queue_entries:
655 special_tasks = models.SpecialTask.objects.filter(
656 task__in=(models.SpecialTask.Task.CLEANUP,
657 models.SpecialTask.Task.VERIFY),
658 queue_entry__id=queue_entry.id,
659 is_complete=False)
660 if special_tasks.count() == 0:
661 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000662
showardb8900452009-10-12 20:31:01 +0000663 if unrecovered_hqes:
664 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700665 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000666 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000667 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000668
669
showard65db3932009-10-28 19:54:35 +0000670 def _schedule_special_tasks(self):
671 """
672 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700673
674 Special tasks include PreJobTasks like verify, reset and cleanup.
675 They are created through _schedule_new_jobs and associated with a hqe
676 This method translates SpecialTasks to the appropriate AgentTask and
677 adds them to the dispatchers agents list, so _handle_agents can execute
678 them.
showard65db3932009-10-28 19:54:35 +0000679 """
Prashanth B4ec98672014-05-15 10:44:54 -0700680 # When the host scheduler is responsible for acquisition we only want
681 # to run tasks with leased hosts. All hqe tasks will already have
682 # leased hosts, and we don't want to run frontend tasks till the host
683 # scheduler has vetted the assignment. Note that this doesn't include
684 # frontend tasks with hosts leased by other active hqes.
685 for task in self._job_query_manager.get_prioritized_special_tasks(
686 only_tasks_with_leased_hosts=not _inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000687 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000688 continue
showardd1195652009-12-08 22:21:02 +0000689 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000690
691
showard170873e2009-01-07 00:22:26 +0000692 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000693 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000694 # should never happen
showarded2afea2009-07-07 20:54:07 +0000695 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000696 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000697 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700698 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000699 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000700
701
jadmanski0afbb632008-06-06 21:10:57 +0000702 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000703 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700704 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000705 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000706 if self.host_has_agent(host):
707 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000708 continue
showard8cc058f2009-09-08 16:26:33 +0000709 if self._host_has_scheduled_special_task(host):
710 # host will have a special task scheduled on the next cycle
711 continue
showard170873e2009-01-07 00:22:26 +0000712 if print_message:
showardb18134f2009-03-20 20:52:18 +0000713 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000714 models.SpecialTask.objects.create(
715 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000716 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000717
718
jadmanski0afbb632008-06-06 21:10:57 +0000719 def _recover_hosts(self):
720 # recover "Repair Failed" hosts
721 message = 'Reverifying dead host %s'
722 self._reverify_hosts_where("status = 'Repair Failed'",
723 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000724
725
showard89f84db2009-03-12 20:39:13 +0000726 def _refresh_pending_queue_entries(self):
727 """
728 Lookup the pending HostQueueEntries and call our HostScheduler
729 refresh() method given that list. Return the list.
730
731 @returns A list of pending HostQueueEntries sorted in priority order.
732 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700733 queue_entries = self._job_query_manager.get_pending_queue_entries(
734 only_hostless=not _inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000735 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000736 return []
showard89f84db2009-03-12 20:39:13 +0000737 return queue_entries
738
739
showarda9545c02009-12-18 22:44:26 +0000740 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800741 """Schedule a hostless (suite) job.
742
743 @param queue_entry: The queue_entry representing the hostless job.
744 """
showarda9545c02009-12-18 22:44:26 +0000745 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000746 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000747
748
beepscc9fc702013-12-02 12:45:38 -0800749 def _schedule_host_job(self, host, queue_entry):
750 """Schedules a job on the given host.
751
752 1. Assign the host to the hqe, if it isn't already assigned.
753 2. Create a SpecialAgentTask for the hqe.
754 3. Activate the hqe.
755
756 @param queue_entry: The job to schedule.
757 @param host: The host to schedule the job on.
758 """
759 if self.host_has_agent(host):
760 host_agent_task = list(self._host_agents.get(host.id))[0].task
761 subject = 'Host with agents assigned to an HQE'
762 message = ('HQE: %s assigned host %s, but the host has '
763 'agent: %s for queue_entry %s. The HQE '
beepsf7d35162014-02-13 16:42:13 -0800764 'will have to try and acquire a host next tick ' %
beepscc9fc702013-12-02 12:45:38 -0800765 (queue_entry, host.hostname, host_agent_task,
766 host_agent_task.queue_entry))
767 email_manager.manager.enqueue_notify_email(subject, message)
beepscc9fc702013-12-02 12:45:38 -0800768 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700769 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800770
771
showard89f84db2009-03-12 20:39:13 +0000772 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700773 """
774 Find any new HQEs and call schedule_pre_job_tasks for it.
775
776 This involves setting the status of the HQE and creating a row in the
777 db corresponding the the special task, through
778 scheduler_models._queue_special_task. The new db row is then added as
779 an agent to the dispatcher through _schedule_special_tasks and
780 scheduled for execution on the drone through _handle_agents.
781 """
showard89f84db2009-03-12 20:39:13 +0000782 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000783
beepscc9fc702013-12-02 12:45:38 -0800784 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700785 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700786 new_jobs_with_hosts = 0
787 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800788 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700789 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000790
beepscc9fc702013-12-02 12:45:38 -0800791 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000792 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000793 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700794 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000795 else:
beepscc9fc702013-12-02 12:45:38 -0800796 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700797 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700798
beepsb255fc52013-10-13 23:28:54 -0700799 stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800800 if not host_jobs:
801 return
Prashanth Bf66d51b2014-05-06 12:42:25 -0700802 if not _inline_host_acquisition:
803 message = ('Found %s jobs that need hosts though '
804 '_inline_host_acquisition=%s. Will acquire hosts.' %
805 ([str(job) for job in host_jobs],
806 _inline_host_acquisition))
807 email_manager.manager.enqueue_notify_email(
808 'Processing unexpected host acquisition requests', message)
809 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
810 for host_assignment in jobs_with_hosts:
811 self._schedule_host_job(host_assignment.host, host_assignment.job)
812 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800813
beepsb255fc52013-10-13 23:28:54 -0700814 stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
815 stats.Gauge(key).send('new_jobs_without_hosts',
816 new_jobs_need_hosts - new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000817
818
showard8cc058f2009-09-08 16:26:33 +0000819 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700820 """
821 Adds agents to the dispatcher.
822
823 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
824 QueueTask for example, will have a job with a control file, and
825 the agent will have methods that poll, abort and check if the queue
826 task is finished. The dispatcher runs the agent_task, as well as
827 other agents in it's _agents member, through _handle_agents, by
828 calling the Agents tick().
829
830 This method creates an agent for each HQE in one of (starting, running,
831 gathering, parsing, archiving) states, and adds it to the dispatcher so
832 it is handled by _handle_agents.
833 """
showardd1195652009-12-08 22:21:02 +0000834 for agent_task in self._get_queue_entry_agent_tasks():
835 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000836
837
838 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000839 for entry in scheduler_models.HostQueueEntry.fetch(
840 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000841 task = entry.job.schedule_delayed_callback_task(entry)
842 if task:
showardd1195652009-12-08 22:21:02 +0000843 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000844
845
jadmanski0afbb632008-06-06 21:10:57 +0000846 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700847 """
848 Looks through the afe_host_queue_entries for an aborted entry.
849
850 The aborted bit is set on an HQE in many ways, the most common
851 being when a user requests an abort through the frontend, which
852 results in an rpc from the afe to abort_host_queue_entries.
853 """
jamesrene7c65cb2010-06-08 20:38:10 +0000854 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000855 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700856 where='aborted=1 and complete=0'):
showardf4a2e502009-07-28 20:06:39 +0000857 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800858
859 # The task would have started off with both is_complete and
860 # is_active = False. Aborted tasks are neither active nor complete.
861 # For all currently active tasks this will happen through the agent,
862 # but we need to manually update the special tasks that haven't
863 # started yet, because they don't have agents.
864 models.SpecialTask.objects.filter(is_active=False,
865 queue_entry_id=entry.id).update(is_complete=True)
866
showardd3dc1992009-04-22 21:01:40 +0000867 for agent in self.get_agents_for_entry(entry):
868 agent.abort()
869 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000870 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700871 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000872 for job in jobs_to_stop:
873 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000874
875
beeps8bb1f7d2013-08-05 01:30:09 -0700876 def _find_aborted_special_tasks(self):
877 """
878 Find SpecialTasks that have been marked for abortion.
879
880 Poll the database looking for SpecialTasks that are active
881 and have been marked for abortion, then abort them.
882 """
883
884 # The completed and active bits are very important when it comes
885 # to scheduler correctness. The active bit is set through the prolog
886 # of a special task, and reset through the cleanup method of the
887 # SpecialAgentTask. The cleanup is called both through the abort and
888 # epilog. The complete bit is set in several places, and in general
889 # a hanging job will have is_active=1 is_complete=0, while a special
890 # task which completed will have is_active=0 is_complete=1. To check
891 # aborts we directly check active because the complete bit is set in
892 # several places, including the epilog of agent tasks.
893 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
894 is_aborted=True)
895 for task in aborted_tasks:
896 # There are 2 ways to get the agent associated with a task,
897 # through the host and through the hqe. A special task
898 # always needs a host, but doesn't always need a hqe.
899 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700900 if isinstance(agent.task, agent_task.SpecialAgentTask):
beeps8bb1f7d2013-08-05 01:30:09 -0700901 # The epilog preforms critical actions such as
902 # queueing the next SpecialTask, requeuing the
903 # hqe etc, however it doesn't actually kill the
904 # monitor process and set the 'done' bit. Epilogs
905 # assume that the job failed, and that the monitor
906 # process has already written an exit code. The
907 # done bit is a necessary condition for
908 # _handle_agents to schedule any more special
909 # tasks against the host, and it must be set
910 # in addition to is_active, is_complete and success.
Jakob Juelichb7c842f2014-07-23 15:18:29 -0700911 agent.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700912 agent.task.epilog()
beeps8bb1f7d2013-08-05 01:30:09 -0700913
914
showard324bf812009-01-20 23:23:38 +0000915 def _can_start_agent(self, agent, num_started_this_cycle,
916 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000917 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000918 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000919 return True
920 # don't allow any nonzero-process agents to run after we've reached a
921 # limit (this avoids starvation of many-process agents)
922 if have_reached_limit:
923 return False
924 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000925 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000926 agent.task.owner_username,
927 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000928 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000929 return False
930 # if a single agent exceeds the per-cycle throttling, still allow it to
931 # run when it's the first agent in the cycle
932 if num_started_this_cycle == 0:
933 return True
934 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000935 if (num_started_this_cycle + agent.task.num_processes >
936 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000937 return False
938 return True
939
940
jadmanski0afbb632008-06-06 21:10:57 +0000941 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700942 """
943 Handles agents of the dispatcher.
944
945 Appropriate Agents are added to the dispatcher through
946 _schedule_running_host_queue_entries. These agents each
947 have a task. This method runs the agents task through
948 agent.tick() leading to:
949 agent.start
950 prolog -> AgentTasks prolog
951 For each queue entry:
952 sets host status/status to Running
953 set started_on in afe_host_queue_entries
954 run -> AgentTasks run
955 Creates PidfileRunMonitor
956 Queues the autoserv command line for this AgentTask
957 via the drone manager. These commands are executed
958 through the drone managers execute actions.
959 poll -> AgentTasks/BaseAgentTask poll
960 checks the monitors exit_code.
961 Executes epilog if task is finished.
962 Executes AgentTasks _finish_task
963 finish_task is usually responsible for setting the status
964 of the HQE/host, and updating it's active and complete fileds.
965
966 agent.is_done
967 Removed the agent from the dispatchers _agents queue.
968 Is_done checks the finished bit on the agent, that is
969 set based on the Agents task. During the agents poll
970 we check to see if the monitor process has exited in
971 it's finish method, and set the success member of the
972 task based on this exit code.
973 """
jadmanski0afbb632008-06-06 21:10:57 +0000974 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000975 have_reached_limit = False
976 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700977 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000978 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700979 self._log_extra_msg('Processing Agent with Host Ids: %s and '
980 'queue_entry ids:%s' % (agent.host_ids,
981 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000982 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000983 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000984 have_reached_limit):
985 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700986 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000987 continue
showardd1195652009-12-08 22:21:02 +0000988 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700989 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000990 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700991 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000992 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700993 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000994 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700995 logging.info('%d running processes. %d added this cycle.',
996 _drone_manager.total_running_processes(),
997 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +0000998
999
showard29f7cd22009-04-29 21:16:24 +00001000 def _process_recurring_runs(self):
1001 recurring_runs = models.RecurringRun.objects.filter(
1002 start_date__lte=datetime.datetime.now())
1003 for rrun in recurring_runs:
1004 # Create job from template
1005 job = rrun.job
1006 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001007 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001008
1009 host_objects = info['hosts']
1010 one_time_hosts = info['one_time_hosts']
1011 metahost_objects = info['meta_hosts']
1012 dependencies = info['dependencies']
1013 atomic_group = info['atomic_group']
1014
1015 for host in one_time_hosts or []:
1016 this_host = models.Host.create_one_time_host(host.hostname)
1017 host_objects.append(this_host)
1018
1019 try:
1020 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001021 options=options,
showard29f7cd22009-04-29 21:16:24 +00001022 host_objects=host_objects,
1023 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001024 atomic_group=atomic_group)
1025
1026 except Exception, ex:
1027 logging.exception(ex)
1028 #TODO send email
1029
1030 if rrun.loop_count == 1:
1031 rrun.delete()
1032 else:
1033 if rrun.loop_count != 0: # if not infinite loop
1034 # calculate new start_date
1035 difference = datetime.timedelta(seconds=rrun.loop_period)
1036 rrun.start_date = rrun.start_date + difference
1037 rrun.loop_count -= 1
1038 rrun.save()
1039
1040
Simran Basia858a232012-08-21 11:04:37 -07001041SiteDispatcher = utils.import_site_class(
1042 __file__, 'autotest_lib.scheduler.site_monitor_db',
1043 'SiteDispatcher', BaseDispatcher)
1044
1045class Dispatcher(SiteDispatcher):
1046 pass
1047
1048
mbligh36768f02008-02-22 18:28:33 +00001049class Agent(object):
showard77182562009-06-10 00:16:05 +00001050 """
Alex Miller47715eb2013-07-24 03:34:01 -07001051 An agent for use by the Dispatcher class to perform a task. An agent wraps
1052 around an AgentTask mainly to associate the AgentTask with the queue_entry
1053 and host ids.
showard77182562009-06-10 00:16:05 +00001054
1055 The following methods are required on all task objects:
1056 poll() - Called periodically to let the task check its status and
1057 update its internal state. If the task succeeded.
1058 is_done() - Returns True if the task is finished.
1059 abort() - Called when an abort has been requested. The task must
1060 set its aborted attribute to True if it actually aborted.
1061
1062 The following attributes are required on all task objects:
1063 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001064 success - bool, True if this task succeeded.
1065 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1066 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001067 """
1068
1069
showard418785b2009-11-23 20:19:59 +00001070 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001071 """
Alex Miller47715eb2013-07-24 03:34:01 -07001072 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001073 """
showard8cc058f2009-09-08 16:26:33 +00001074 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001075
showard77182562009-06-10 00:16:05 +00001076 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001077 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001078
showard8cc058f2009-09-08 16:26:33 +00001079 self.queue_entry_ids = task.queue_entry_ids
1080 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001081
showard8cc058f2009-09-08 16:26:33 +00001082 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001083 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001084
1085
jadmanski0afbb632008-06-06 21:10:57 +00001086 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001087 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001088 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001089 self.task.poll()
1090 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001091 self.finished = True
showardec113162008-05-08 00:52:49 +00001092
1093
jadmanski0afbb632008-06-06 21:10:57 +00001094 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001095 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001096
1097
showardd3dc1992009-04-22 21:01:40 +00001098 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001099 if self.task:
1100 self.task.abort()
1101 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001102 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001103 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001104
showardd3dc1992009-04-22 21:01:40 +00001105
beeps5e2bb4a2013-10-28 11:26:45 -07001106class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001107 """
1108 Common functionality for QueueTask and HostlessQueueTask
1109 """
1110 def __init__(self, queue_entries):
1111 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001112 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001113 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001114
1115
showard73ec0442009-02-07 02:05:20 +00001116 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001117 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001118
1119
jamesrenc44ae992010-02-19 00:12:54 +00001120 def _write_control_file(self, execution_path):
1121 control_path = _drone_manager.attach_file_to_execution(
1122 execution_path, self.job.control_file)
1123 return control_path
1124
1125
Aviv Keshet308e7362013-05-21 14:43:16 -07001126 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001127 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001128 execution_path = self.queue_entries[0].execution_path()
1129 control_path = self._write_control_file(execution_path)
1130 hostnames = ','.join(entry.host.hostname
1131 for entry in self.queue_entries
1132 if not entry.is_hostless())
1133
1134 execution_tag = self.queue_entries[0].execution_tag()
1135 params = _autoserv_command_line(
1136 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001137 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001138 _drone_manager.absolute_path(control_path)],
1139 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001140 if self.job.is_image_update_job():
1141 params += ['--image', self.job.update_image_path]
1142
jamesrenc44ae992010-02-19 00:12:54 +00001143 return params
showardd1195652009-12-08 22:21:02 +00001144
1145
1146 @property
1147 def num_processes(self):
1148 return len(self.queue_entries)
1149
1150
1151 @property
1152 def owner_username(self):
1153 return self.job.owner
1154
1155
1156 def _working_directory(self):
1157 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001158
1159
jadmanski0afbb632008-06-06 21:10:57 +00001160 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001161 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001162 keyval_dict = self.job.keyval_dict()
1163 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001164 group_name = self.queue_entries[0].get_group_name()
1165 if group_name:
1166 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001167 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001168 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001169 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001170 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001171
1172
showard35162b02009-03-03 02:17:30 +00001173 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001174 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001175 _drone_manager.write_lines_to_file(error_file_path,
1176 [_LOST_PROCESS_ERROR])
1177
1178
showardd3dc1992009-04-22 21:01:40 +00001179 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001180 if not self.monitor:
1181 return
1182
showardd9205182009-04-27 20:09:55 +00001183 self._write_job_finished()
1184
showard35162b02009-03-03 02:17:30 +00001185 if self.monitor.lost_process:
1186 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001187
jadmanskif7fa2cc2008-10-01 14:13:23 +00001188
showardcbd74612008-11-19 21:42:02 +00001189 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001190 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001191 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001192 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001193 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001194
1195
jadmanskif7fa2cc2008-10-01 14:13:23 +00001196 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001197 if not self.monitor or not self.monitor.has_process():
1198 return
1199
jadmanskif7fa2cc2008-10-01 14:13:23 +00001200 # build up sets of all the aborted_by and aborted_on values
1201 aborted_by, aborted_on = set(), set()
1202 for queue_entry in self.queue_entries:
1203 if queue_entry.aborted_by:
1204 aborted_by.add(queue_entry.aborted_by)
1205 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1206 aborted_on.add(t)
1207
1208 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001209 # TODO(showard): this conditional is now obsolete, we just need to leave
1210 # it in temporarily for backwards compatibility over upgrades. delete
1211 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001212 assert len(aborted_by) <= 1
1213 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001214 aborted_by_value = aborted_by.pop()
1215 aborted_on_value = max(aborted_on)
1216 else:
1217 aborted_by_value = 'autotest_system'
1218 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001219
showarda0382352009-02-11 23:36:43 +00001220 self._write_keyval_after_job("aborted_by", aborted_by_value)
1221 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001222
showardcbd74612008-11-19 21:42:02 +00001223 aborted_on_string = str(datetime.datetime.fromtimestamp(
1224 aborted_on_value))
1225 self._write_status_comment('Job aborted by %s on %s' %
1226 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001227
1228
jadmanski0afbb632008-06-06 21:10:57 +00001229 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001230 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001231 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001232 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001233
1234
jadmanski0afbb632008-06-06 21:10:57 +00001235 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001236 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001237 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001238
1239
1240class QueueTask(AbstractQueueTask):
1241 def __init__(self, queue_entries):
1242 super(QueueTask, self).__init__(queue_entries)
1243 self._set_ids(queue_entries=queue_entries)
1244
1245
1246 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001247 self._check_queue_entry_statuses(
1248 self.queue_entries,
1249 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1250 models.HostQueueEntry.Status.RUNNING),
1251 allowed_host_statuses=(models.Host.Status.PENDING,
1252 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001253
1254 super(QueueTask, self).prolog()
1255
1256 for queue_entry in self.queue_entries:
1257 self._write_host_keyvals(queue_entry.host)
1258 queue_entry.host.set_status(models.Host.Status.RUNNING)
1259 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001260
1261
1262 def _finish_task(self):
1263 super(QueueTask, self)._finish_task()
1264
1265 for queue_entry in self.queue_entries:
1266 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001267 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001268
1269
Alex Miller9f01d5d2013-08-08 02:26:01 -07001270 def _command_line(self):
1271 invocation = super(QueueTask, self)._command_line()
1272 return invocation + ['--verify_job_repo_url']
1273
1274
Dan Shi1a189052013-10-28 14:41:35 -07001275class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001276 def __init__(self, queue_entry):
1277 super(HostlessQueueTask, self).__init__([queue_entry])
1278 self.queue_entry_ids = [queue_entry.id]
1279
1280
1281 def prolog(self):
1282 self.queue_entries[0].update_field('execution_subdir', 'hostless')
1283 super(HostlessQueueTask, self).prolog()
1284
1285
mbligh4608b002010-01-05 18:22:35 +00001286 def _finish_task(self):
1287 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001288
1289 # When a job is added to database, its initial status is always
1290 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1291 # status, check if any of them can be started. If scheduler hits some
Alex Millerac189f32014-06-23 13:55:23 -07001292 # limit, e.g., max_hostless_jobs_per_drone,
1293 # max_processes_started_per_cycle, scheduler will leave these jobs in
1294 # Starting status. Otherwise, the jobs' status will be changed to
1295 # Running, and an autoserv process will be started in drone for each of
1296 # these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001297 # If the entry is still in status Starting, the process has not started
1298 # yet. Therefore, there is no need to parse and collect log. Without
1299 # this check, exception will be raised by scheduler as execution_subdir
1300 # for this queue entry does not have a value yet.
1301 hqe = self.queue_entries[0]
1302 if hqe.status != models.HostQueueEntry.Status.STARTING:
1303 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001304
1305
mbligh36768f02008-02-22 18:28:33 +00001306if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001307 main()