blob: 6c629621b66d5ca5808696975accedf9cacde19e [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
mbligh36768f02008-02-22 18:28:33 +00008
Aviv Keshet225bdfe2013-03-05 10:10:08 -08009import datetime, optparse, os, signal
10import sys, time, traceback, urllib
11import logging, gc
showard402934a2009-12-21 22:20:47 +000012
Alex Miller05d7b4c2013-03-04 07:49:38 -080013import common
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000015
16import django.db
17
showard136e6dc2009-06-10 19:38:49 +000018from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000019from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000020from autotest_lib.database import database_connection
jamesrendd855242010-03-02 22:23:44 +000021from autotest_lib.frontend.afe import model_attributes
Alex Miller05d7b4c2013-03-04 07:49:38 -080022from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000023from autotest_lib.scheduler import drone_manager, drones, email_manager
Dale Curtisaa513362011-03-01 17:27:44 -080024from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
Alex Miller05d7b4c2013-03-04 07:49:38 -080025from autotest_lib.scheduler import scheduler_logging_config
jamesrenc44ae992010-02-19 00:12:54 +000026from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080027from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070028from autotest_lib.server import autoserv_utils
Fang Deng1d6c2a02013-04-17 15:25:45 -070029from autotest_lib.site_utils.graphite import stats
Alex Miller05d7b4c2013-03-04 07:49:38 -080030
showard549afad2009-08-20 23:33:36 +000031BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
32PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000033
mbligh36768f02008-02-22 18:28:33 +000034RESULTS_DIR = '.'
35AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000036DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000037AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
38
39if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000040 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000041AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
42AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
43
44if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000045 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000046
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
Aviv Keshet308e7362013-05-21 14:43:16 -070056_autoserv_directory = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server')
57_autoserv_path = os.path.join(_autoserv_directory, 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000059_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000060
Eric Lie0493a42010-11-15 13:05:43 -080061def _parser_path_default(install_dir):
62 return os.path.join(install_dir, 'tko', 'parse')
63_parser_path_func = utils.import_site_function(
64 __file__, 'autotest_lib.scheduler.site_monitor_db',
65 'parser_path', _parser_path_default)
66_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
67
mbligh36768f02008-02-22 18:28:33 +000068
showardec6a3b92009-09-25 20:29:13 +000069def _get_pidfile_timeout_secs():
70 """@returns How long to wait for autoserv to write pidfile."""
71 pidfile_timeout_mins = global_config.global_config.get_config_value(
72 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
73 return pidfile_timeout_mins * 60
74
75
mbligh83c1e9e2009-05-01 23:10:41 +000076def _site_init_monitor_db_dummy():
77 return {}
78
79
jamesren76fcf192010-04-21 20:39:50 +000080def _verify_default_drone_set_exists():
81 if (models.DroneSet.drone_sets_enabled() and
82 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080083 raise host_scheduler.SchedulerError(
84 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000085
86
87def _sanity_check():
88 """Make sure the configs are consistent before starting the scheduler"""
89 _verify_default_drone_set_exists()
90
91
mbligh36768f02008-02-22 18:28:33 +000092def main():
showard27f33872009-04-07 18:20:53 +000093 try:
showard549afad2009-08-20 23:33:36 +000094 try:
95 main_without_exception_handling()
96 except SystemExit:
97 raise
98 except:
99 logging.exception('Exception escaping in monitor_db')
100 raise
101 finally:
102 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000103
104
105def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000106 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000107
showard136e6dc2009-06-10 19:38:49 +0000108 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000109 parser = optparse.OptionParser(usage)
110 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
111 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000112 parser.add_option('--test', help='Indicate that scheduler is under ' +
113 'test and should use dummy autoserv and no parsing',
114 action='store_true')
115 (options, args) = parser.parse_args()
116 if len(args) != 1:
117 parser.print_usage()
118 return
mbligh36768f02008-02-22 18:28:33 +0000119
showard5613c662009-06-08 23:30:33 +0000120 scheduler_enabled = global_config.global_config.get_config_value(
121 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
122
123 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800124 logging.error("Scheduler not enabled, set enable_scheduler to true in "
125 "the global_config's SCHEDULER section to enable it. "
126 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000127 sys.exit(1)
128
jadmanski0afbb632008-06-06 21:10:57 +0000129 global RESULTS_DIR
130 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000131
mbligh83c1e9e2009-05-01 23:10:41 +0000132 site_init = utils.import_site_function(__file__,
133 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
134 _site_init_monitor_db_dummy)
135 site_init()
136
showardcca334f2009-03-12 20:38:34 +0000137 # Change the cwd while running to avoid issues incase we were launched from
138 # somewhere odd (such as a random NFS home directory of the person running
139 # sudo to launch us as the appropriate user).
140 os.chdir(RESULTS_DIR)
141
jamesrenc7d387e2010-08-10 21:48:30 +0000142 # This is helpful for debugging why stuff a scheduler launches is
143 # misbehaving.
144 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000145
jadmanski0afbb632008-06-06 21:10:57 +0000146 if options.test:
147 global _autoserv_path
148 _autoserv_path = 'autoserv_dummy'
149 global _testing_mode
150 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000151
jamesrenc44ae992010-02-19 00:12:54 +0000152 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000153 server.start()
154
jadmanski0afbb632008-06-06 21:10:57 +0000155 try:
jamesrenc44ae992010-02-19 00:12:54 +0000156 initialize()
showardc5afc462009-01-13 00:09:39 +0000157 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000158 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000159
Eric Lia82dc352011-02-23 13:15:52 -0800160 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000161 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000162 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000163 except:
showard170873e2009-01-07 00:22:26 +0000164 email_manager.manager.log_stacktrace(
165 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000166
showard170873e2009-01-07 00:22:26 +0000167 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000168 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000169 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000170 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000171
172
showard136e6dc2009-06-10 19:38:49 +0000173def setup_logging():
174 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
175 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
176 logging_manager.configure_logging(
177 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
178 logfile_name=log_name)
179
180
mbligh36768f02008-02-22 18:28:33 +0000181def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000182 global _shutdown
183 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000184 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000185
186
jamesrenc44ae992010-02-19 00:12:54 +0000187def initialize():
showardb18134f2009-03-20 20:52:18 +0000188 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
189 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000190
showard8de37132009-08-31 18:33:08 +0000191 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000192 logging.critical("monitor_db already running, aborting!")
193 sys.exit(1)
194 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000195
showardb1e51872008-10-07 11:08:18 +0000196 if _testing_mode:
197 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000198 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000199
jadmanski0afbb632008-06-06 21:10:57 +0000200 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
201 global _db
showard170873e2009-01-07 00:22:26 +0000202 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000203 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000204
showardfa8629c2008-11-04 16:51:23 +0000205 # ensure Django connection is in autocommit
206 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000207 # bypass the readonly connection
208 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000209
showardb18134f2009-03-20 20:52:18 +0000210 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000211 signal.signal(signal.SIGINT, handle_sigint)
212
jamesrenc44ae992010-02-19 00:12:54 +0000213 initialize_globals()
214 scheduler_models.initialize()
215
showardd1ee1dd2009-01-07 21:33:08 +0000216 drones = global_config.global_config.get_config_value(
217 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
218 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000219 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000220 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000221 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
222
showardb18134f2009-03-20 20:52:18 +0000223 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000224
225
jamesrenc44ae992010-02-19 00:12:54 +0000226def initialize_globals():
227 global _drone_manager
228 _drone_manager = drone_manager.instance()
229
230
showarded2afea2009-07-07 20:54:07 +0000231def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
232 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000233 """
234 @returns The autoserv command line as a list of executable + parameters.
235
236 @param machines - string - A machine or comma separated list of machines
237 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000238 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700239 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
240 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000241 @param queue_entry - A HostQueueEntry object - If supplied and no Job
242 object was supplied, this will be used to lookup the Job object.
243 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700244 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
245 machines, results_directory=drone_manager.WORKING_DIRECTORY,
246 extra_args=extra_args, job=job, queue_entry=queue_entry,
247 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000248
249
Simran Basia858a232012-08-21 11:04:37 -0700250class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800251
252
jadmanski0afbb632008-06-06 21:10:57 +0000253 def __init__(self):
254 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000255 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800256 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000257 user_cleanup_time = scheduler_config.config.clean_interval
258 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
259 _db, user_cleanup_time)
260 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000261 self._host_agents = {}
262 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000263 self._tick_count = 0
264 self._last_garbage_stats_time = time.time()
265 self._seconds_between_garbage_stats = 60 * (
266 global_config.global_config.get_config_value(
267 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700268 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700269 self._tick_debug = global_config.global_config.get_config_value(
270 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
271 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700272 self._extra_debugging = global_config.global_config.get_config_value(
273 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
274 default=False)
mbligh36768f02008-02-22 18:28:33 +0000275
mbligh36768f02008-02-22 18:28:33 +0000276
showard915958d2009-04-22 21:00:58 +0000277 def initialize(self, recover_hosts=True):
278 self._periodic_cleanup.initialize()
279 self._24hr_upkeep.initialize()
280
jadmanski0afbb632008-06-06 21:10:57 +0000281 # always recover processes
282 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000283
jadmanski0afbb632008-06-06 21:10:57 +0000284 if recover_hosts:
285 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000286
jamesrenc44ae992010-02-19 00:12:54 +0000287 self._host_scheduler.recovery_on_startup()
288
mbligh36768f02008-02-22 18:28:33 +0000289
Simran Basi0ec94dd2012-08-28 09:50:10 -0700290 def _log_tick_msg(self, msg):
291 if self._tick_debug:
292 logging.debug(msg)
293
294
Simran Basidef92872012-09-20 13:34:34 -0700295 def _log_extra_msg(self, msg):
296 if self._extra_debugging:
297 logging.debug(msg)
298
299
jadmanski0afbb632008-06-06 21:10:57 +0000300 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700301 """
302 This is an altered version of tick() where we keep track of when each
303 major step begins so we can try to figure out where we are using most
304 of the tick time.
305 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700306 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700307 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000308 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700309 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000310 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700311 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000312 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700313 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000314 self._find_aborting()
Simran Basi3f6717d2012-09-13 15:21:22 -0700315 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000316 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700317 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000318 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700319 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000320 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700321 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000322 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700323 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000324 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700325 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000326 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700327 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000328 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700329 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000330 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700331 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700332 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700333 with timer.get_client('email_manager_send_queued_emails'):
334 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700335 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700336 with timer.get_client('django_db_reset_queries'):
337 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000338 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000339
showard97aed502008-11-04 02:01:24 +0000340
mblighf3294cc2009-04-08 21:17:38 +0000341 def _run_cleanup(self):
342 self._periodic_cleanup.run_cleanup_maybe()
343 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000344
mbligh36768f02008-02-22 18:28:33 +0000345
showardf13a9e22009-12-18 22:54:09 +0000346 def _garbage_collection(self):
347 threshold_time = time.time() - self._seconds_between_garbage_stats
348 if threshold_time < self._last_garbage_stats_time:
349 # Don't generate these reports very often.
350 return
351
352 self._last_garbage_stats_time = time.time()
353 # Force a full level 0 collection (because we can, it doesn't hurt
354 # at this interval).
355 gc.collect()
356 logging.info('Logging garbage collector stats on tick %d.',
357 self._tick_count)
358 gc_stats._log_garbage_collector_stats()
359
360
showard170873e2009-01-07 00:22:26 +0000361 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
362 for object_id in object_ids:
363 agent_dict.setdefault(object_id, set()).add(agent)
364
365
366 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
367 for object_id in object_ids:
368 assert object_id in agent_dict
369 agent_dict[object_id].remove(agent)
370
371
showardd1195652009-12-08 22:21:02 +0000372 def add_agent_task(self, agent_task):
373 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000374 self._agents.append(agent)
375 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000376 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
377 self._register_agent_for_ids(self._queue_entry_agents,
378 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000379
showard170873e2009-01-07 00:22:26 +0000380
381 def get_agents_for_entry(self, queue_entry):
382 """
383 Find agents corresponding to the specified queue_entry.
384 """
showardd3dc1992009-04-22 21:01:40 +0000385 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000386
387
388 def host_has_agent(self, host):
389 """
390 Determine if there is currently an Agent present using this host.
391 """
392 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000393
394
jadmanski0afbb632008-06-06 21:10:57 +0000395 def remove_agent(self, agent):
396 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000397 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
398 agent)
399 self._unregister_agent_for_ids(self._queue_entry_agents,
400 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000401
402
showard8cc058f2009-09-08 16:26:33 +0000403 def _host_has_scheduled_special_task(self, host):
404 return bool(models.SpecialTask.objects.filter(host__id=host.id,
405 is_active=False,
406 is_complete=False))
407
408
jadmanski0afbb632008-06-06 21:10:57 +0000409 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000410 agent_tasks = self._create_recovery_agent_tasks()
411 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000412 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000413 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000414 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000415 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000416 self._reverify_remaining_hosts()
417 # reinitialize drones after killing orphaned processes, since they can
418 # leave around files when they die
419 _drone_manager.execute_actions()
420 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000421
showard170873e2009-01-07 00:22:26 +0000422
showardd1195652009-12-08 22:21:02 +0000423 def _create_recovery_agent_tasks(self):
424 return (self._get_queue_entry_agent_tasks()
425 + self._get_special_task_agent_tasks(is_active=True))
426
427
428 def _get_queue_entry_agent_tasks(self):
429 # host queue entry statuses handled directly by AgentTasks (Verifying is
430 # handled through SpecialTasks, so is not listed here)
431 statuses = (models.HostQueueEntry.Status.STARTING,
432 models.HostQueueEntry.Status.RUNNING,
433 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000434 models.HostQueueEntry.Status.PARSING,
435 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000436 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000437 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000438 where='status IN (%s)' % status_list)
439
440 agent_tasks = []
441 used_queue_entries = set()
442 for entry in queue_entries:
443 if self.get_agents_for_entry(entry):
444 # already being handled
445 continue
446 if entry in used_queue_entries:
447 # already picked up by a synchronous job
448 continue
449 agent_task = self._get_agent_task_for_queue_entry(entry)
450 agent_tasks.append(agent_task)
451 used_queue_entries.update(agent_task.queue_entries)
452 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000453
454
showardd1195652009-12-08 22:21:02 +0000455 def _get_special_task_agent_tasks(self, is_active=False):
456 special_tasks = models.SpecialTask.objects.filter(
457 is_active=is_active, is_complete=False)
458 return [self._get_agent_task_for_special_task(task)
459 for task in special_tasks]
460
461
462 def _get_agent_task_for_queue_entry(self, queue_entry):
463 """
464 Construct an AgentTask instance for the given active HostQueueEntry,
465 if one can currently run it.
466 @param queue_entry: a HostQueueEntry
467 @returns an AgentTask to run the queue entry
468 """
469 task_entries = queue_entry.job.get_group_entries(queue_entry)
470 self._check_for_duplicate_host_entries(task_entries)
471
472 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
473 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000474 if queue_entry.is_hostless():
475 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000476 return QueueTask(queue_entries=task_entries)
477 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
478 return GatherLogsTask(queue_entries=task_entries)
479 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
480 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000481 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
482 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000483
Dale Curtisaa513362011-03-01 17:27:44 -0800484 raise host_scheduler.SchedulerError(
485 '_get_agent_task_for_queue_entry got entry with '
486 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000487
488
489 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000490 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
491 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000492 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000493 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000494 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000495 if using_host:
showardd1195652009-12-08 22:21:02 +0000496 self._assert_host_has_no_agent(task_entry)
497
498
499 def _assert_host_has_no_agent(self, entry):
500 """
501 @param entry: a HostQueueEntry or a SpecialTask
502 """
503 if self.host_has_agent(entry.host):
504 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800505 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000506 'While scheduling %s, host %s already has a host agent %s'
507 % (entry, entry.host, agent.task))
508
509
510 def _get_agent_task_for_special_task(self, special_task):
511 """
512 Construct an AgentTask class to run the given SpecialTask and add it
513 to this dispatcher.
514 @param special_task: a models.SpecialTask instance
515 @returns an AgentTask to run this SpecialTask
516 """
517 self._assert_host_has_no_agent(special_task)
518
519 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
520 for agent_task_class in special_agent_task_classes:
521 if agent_task_class.TASK_TYPE == special_task.task:
522 return agent_task_class(task=special_task)
523
Dale Curtisaa513362011-03-01 17:27:44 -0800524 raise host_scheduler.SchedulerError(
525 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000526
527
528 def _register_pidfiles(self, agent_tasks):
529 for agent_task in agent_tasks:
530 agent_task.register_necessary_pidfiles()
531
532
533 def _recover_tasks(self, agent_tasks):
534 orphans = _drone_manager.get_orphaned_autoserv_processes()
535
536 for agent_task in agent_tasks:
537 agent_task.recover()
538 if agent_task.monitor and agent_task.monitor.has_process():
539 orphans.discard(agent_task.monitor.get_process())
540 self.add_agent_task(agent_task)
541
542 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000543
544
showard8cc058f2009-09-08 16:26:33 +0000545 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000546 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
547 % status):
showard0db3d432009-10-12 20:29:15 +0000548 if entry.status == status and not self.get_agents_for_entry(entry):
549 # The status can change during iteration, e.g., if job.run()
550 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000551 yield entry
552
553
showard6878e8b2009-07-20 22:37:45 +0000554 def _check_for_remaining_orphan_processes(self, orphans):
555 if not orphans:
556 return
557 subject = 'Unrecovered orphan autoserv processes remain'
558 message = '\n'.join(str(process) for process in orphans)
559 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000560
561 die_on_orphans = global_config.global_config.get_config_value(
562 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
563
564 if die_on_orphans:
565 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000566
showard170873e2009-01-07 00:22:26 +0000567
showard8cc058f2009-09-08 16:26:33 +0000568 def _recover_pending_entries(self):
569 for entry in self._get_unassigned_entries(
570 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000571 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000572 entry.on_pending()
573
574
showardb8900452009-10-12 20:31:01 +0000575 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000576 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000577 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
578 unrecovered_hqes = []
579 for queue_entry in queue_entries:
580 special_tasks = models.SpecialTask.objects.filter(
581 task__in=(models.SpecialTask.Task.CLEANUP,
582 models.SpecialTask.Task.VERIFY),
583 queue_entry__id=queue_entry.id,
584 is_complete=False)
585 if special_tasks.count() == 0:
586 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000587
showardb8900452009-10-12 20:31:01 +0000588 if unrecovered_hqes:
589 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800590 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000591 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000592 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000593
594
showard65db3932009-10-28 19:54:35 +0000595 def _get_prioritized_special_tasks(self):
596 """
597 Returns all queued SpecialTasks prioritized for repair first, then
598 cleanup, then verify.
599 """
600 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
601 is_complete=False,
602 host__locked=False)
603 # exclude hosts with active queue entries unless the SpecialTask is for
604 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000605 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000606 queued_tasks, 'afe_host_queue_entries', 'host_id',
607 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000608 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000609 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000610 where=['(afe_host_queue_entries.id IS NULL OR '
611 'afe_host_queue_entries.id = '
612 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000613
showard65db3932009-10-28 19:54:35 +0000614 # reorder tasks by priority
615 task_priority_order = [models.SpecialTask.Task.REPAIR,
616 models.SpecialTask.Task.CLEANUP,
617 models.SpecialTask.Task.VERIFY]
618 def task_priority_key(task):
619 return task_priority_order.index(task.task)
620 return sorted(queued_tasks, key=task_priority_key)
621
622
showard65db3932009-10-28 19:54:35 +0000623 def _schedule_special_tasks(self):
624 """
625 Execute queued SpecialTasks that are ready to run on idle hosts.
626 """
627 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000628 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000629 continue
showardd1195652009-12-08 22:21:02 +0000630 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000631
632
showard170873e2009-01-07 00:22:26 +0000633 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000634 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000635 # should never happen
showarded2afea2009-07-07 20:54:07 +0000636 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000637 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000638 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000639 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000640 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000641
642
jadmanski0afbb632008-06-06 21:10:57 +0000643 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000644 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700645 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000646 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000647 if self.host_has_agent(host):
648 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000649 continue
showard8cc058f2009-09-08 16:26:33 +0000650 if self._host_has_scheduled_special_task(host):
651 # host will have a special task scheduled on the next cycle
652 continue
showard170873e2009-01-07 00:22:26 +0000653 if print_message:
showardb18134f2009-03-20 20:52:18 +0000654 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000655 models.SpecialTask.objects.create(
656 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000657 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000658
659
jadmanski0afbb632008-06-06 21:10:57 +0000660 def _recover_hosts(self):
661 # recover "Repair Failed" hosts
662 message = 'Reverifying dead host %s'
663 self._reverify_hosts_where("status = 'Repair Failed'",
664 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000665
666
showard04c82c52008-05-29 19:38:12 +0000667
showardb95b1bd2008-08-15 18:11:04 +0000668 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000669 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +0000670 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000671 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000672 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +0000673 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000674
675
showard89f84db2009-03-12 20:39:13 +0000676 def _refresh_pending_queue_entries(self):
677 """
678 Lookup the pending HostQueueEntries and call our HostScheduler
679 refresh() method given that list. Return the list.
680
681 @returns A list of pending HostQueueEntries sorted in priority order.
682 """
showard63a34772008-08-18 19:32:50 +0000683 queue_entries = self._get_pending_queue_entries()
684 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000685 return []
showardb95b1bd2008-08-15 18:11:04 +0000686
showard63a34772008-08-18 19:32:50 +0000687 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000688
showard89f84db2009-03-12 20:39:13 +0000689 return queue_entries
690
691
692 def _schedule_atomic_group(self, queue_entry):
693 """
694 Schedule the given queue_entry on an atomic group of hosts.
695
696 Returns immediately if there are insufficient available hosts.
697
698 Creates new HostQueueEntries based off of queue_entry for the
699 scheduled hosts and starts them all running.
700 """
701 # This is a virtual host queue entry representing an entire
702 # atomic group, find a group and schedule their hosts.
703 group_hosts = self._host_scheduler.find_eligible_atomic_group(
704 queue_entry)
705 if not group_hosts:
706 return
showardcbe6f942009-06-17 19:33:49 +0000707
708 logging.info('Expanding atomic group entry %s with hosts %s',
709 queue_entry,
710 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000711
showard89f84db2009-03-12 20:39:13 +0000712 for assigned_host in group_hosts[1:]:
713 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000714 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000715 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000716 new_hqe.set_host(assigned_host)
717 self._run_queue_entry(new_hqe)
718
719 # The first assigned host uses the original HostQueueEntry
720 queue_entry.set_host(group_hosts[0])
721 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000722
723
showarda9545c02009-12-18 22:44:26 +0000724 def _schedule_hostless_job(self, queue_entry):
725 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000726 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000727
728
showard89f84db2009-03-12 20:39:13 +0000729 def _schedule_new_jobs(self):
730 queue_entries = self._refresh_pending_queue_entries()
731 if not queue_entries:
732 return
733
Simran Basi3f6717d2012-09-13 15:21:22 -0700734 logging.debug('Processing %d queue_entries', len(queue_entries))
showard63a34772008-08-18 19:32:50 +0000735 for queue_entry in queue_entries:
Simran Basidef92872012-09-20 13:34:34 -0700736 self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
showarde55955f2009-10-07 20:48:58 +0000737 is_unassigned_atomic_group = (
738 queue_entry.atomic_group_id is not None
739 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +0000740
741 if queue_entry.is_hostless():
Simran Basidef92872012-09-20 13:34:34 -0700742 self._log_extra_msg('Scheduling hostless job.')
showarda9545c02009-12-18 22:44:26 +0000743 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000744 elif is_unassigned_atomic_group:
745 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +0000746 else:
jamesren883492a2010-02-12 00:45:18 +0000747 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +0000748 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +0000749 assert assigned_host.id == queue_entry.host_id
750 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000751
752
showard8cc058f2009-09-08 16:26:33 +0000753 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +0000754 for agent_task in self._get_queue_entry_agent_tasks():
755 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000756
757
758 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000759 for entry in scheduler_models.HostQueueEntry.fetch(
760 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000761 task = entry.job.schedule_delayed_callback_task(entry)
762 if task:
showardd1195652009-12-08 22:21:02 +0000763 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000764
765
jamesren883492a2010-02-12 00:45:18 +0000766 def _run_queue_entry(self, queue_entry):
Simran Basidef92872012-09-20 13:34:34 -0700767 self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
768 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000769 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000770
771
jadmanski0afbb632008-06-06 21:10:57 +0000772 def _find_aborting(self):
jamesrene7c65cb2010-06-08 20:38:10 +0000773 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000774 for entry in scheduler_models.HostQueueEntry.fetch(
775 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +0000776 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +0000777 for agent in self.get_agents_for_entry(entry):
778 agent.abort()
779 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000780 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700781 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000782 for job in jobs_to_stop:
783 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000784
785
showard324bf812009-01-20 23:23:38 +0000786 def _can_start_agent(self, agent, num_started_this_cycle,
787 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000788 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000789 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000790 return True
791 # don't allow any nonzero-process agents to run after we've reached a
792 # limit (this avoids starvation of many-process agents)
793 if have_reached_limit:
794 return False
795 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000796 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000797 agent.task.owner_username,
798 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000799 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000800 return False
801 # if a single agent exceeds the per-cycle throttling, still allow it to
802 # run when it's the first agent in the cycle
803 if num_started_this_cycle == 0:
804 return True
805 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000806 if (num_started_this_cycle + agent.task.num_processes >
807 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000808 return False
809 return True
810
811
jadmanski0afbb632008-06-06 21:10:57 +0000812 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000813 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000814 have_reached_limit = False
815 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700816 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000817 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700818 self._log_extra_msg('Processing Agent with Host Ids: %s and '
819 'queue_entry ids:%s' % (agent.host_ids,
820 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000821 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000822 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000823 have_reached_limit):
824 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700825 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000826 continue
showardd1195652009-12-08 22:21:02 +0000827 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700828 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000829 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700830 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000831 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700832 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000833 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700834 logging.info('%d running processes. %d added this cycle.',
835 _drone_manager.total_running_processes(),
836 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +0000837
838
showard29f7cd22009-04-29 21:16:24 +0000839 def _process_recurring_runs(self):
840 recurring_runs = models.RecurringRun.objects.filter(
841 start_date__lte=datetime.datetime.now())
842 for rrun in recurring_runs:
843 # Create job from template
844 job = rrun.job
845 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000846 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000847
848 host_objects = info['hosts']
849 one_time_hosts = info['one_time_hosts']
850 metahost_objects = info['meta_hosts']
851 dependencies = info['dependencies']
852 atomic_group = info['atomic_group']
853
854 for host in one_time_hosts or []:
855 this_host = models.Host.create_one_time_host(host.hostname)
856 host_objects.append(this_host)
857
858 try:
859 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000860 options=options,
showard29f7cd22009-04-29 21:16:24 +0000861 host_objects=host_objects,
862 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +0000863 atomic_group=atomic_group)
864
865 except Exception, ex:
866 logging.exception(ex)
867 #TODO send email
868
869 if rrun.loop_count == 1:
870 rrun.delete()
871 else:
872 if rrun.loop_count != 0: # if not infinite loop
873 # calculate new start_date
874 difference = datetime.timedelta(seconds=rrun.loop_period)
875 rrun.start_date = rrun.start_date + difference
876 rrun.loop_count -= 1
877 rrun.save()
878
879
Simran Basia858a232012-08-21 11:04:37 -0700880SiteDispatcher = utils.import_site_class(
881 __file__, 'autotest_lib.scheduler.site_monitor_db',
882 'SiteDispatcher', BaseDispatcher)
883
884class Dispatcher(SiteDispatcher):
885 pass
886
887
showard170873e2009-01-07 00:22:26 +0000888class PidfileRunMonitor(object):
889 """
890 Client must call either run() to start a new process or
891 attach_to_existing_process().
892 """
mbligh36768f02008-02-22 18:28:33 +0000893
showard170873e2009-01-07 00:22:26 +0000894 class _PidfileException(Exception):
895 """
896 Raised when there's some unexpected behavior with the pid file, but only
897 used internally (never allowed to escape this class).
898 """
mbligh36768f02008-02-22 18:28:33 +0000899
900
showard170873e2009-01-07 00:22:26 +0000901 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000902 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000903 self._start_time = None
904 self.pidfile_id = None
905 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000906
907
showard170873e2009-01-07 00:22:26 +0000908 def _add_nice_command(self, command, nice_level):
909 if not nice_level:
910 return command
911 return ['nice', '-n', str(nice_level)] + command
912
913
914 def _set_start_time(self):
915 self._start_time = time.time()
916
917
showard418785b2009-11-23 20:19:59 +0000918 def run(self, command, working_directory, num_processes, nice_level=None,
919 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +0000920 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +0000921 assert command is not None
922 if nice_level is not None:
923 command = ['nice', '-n', str(nice_level)] + command
924 self._set_start_time()
925 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +0000926 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +0000927 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +0000928 paired_with_pidfile=paired_with_pidfile, username=username,
929 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +0000930
931
showarded2afea2009-07-07 20:54:07 +0000932 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +0000933 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +0000934 num_processes=None):
showard170873e2009-01-07 00:22:26 +0000935 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +0000936 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000937 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +0000938 if num_processes is not None:
939 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +0000940
941
jadmanski0afbb632008-06-06 21:10:57 +0000942 def kill(self):
showard170873e2009-01-07 00:22:26 +0000943 if self.has_process():
944 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000945
mbligh36768f02008-02-22 18:28:33 +0000946
showard170873e2009-01-07 00:22:26 +0000947 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000948 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000949 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000950
951
showard170873e2009-01-07 00:22:26 +0000952 def get_process(self):
showard21baa452008-10-21 00:08:39 +0000953 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +0000954 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +0000955 return self._state.process
mblighbb421852008-03-11 22:36:16 +0000956
957
showard170873e2009-01-07 00:22:26 +0000958 def _read_pidfile(self, use_second_read=False):
959 assert self.pidfile_id is not None, (
960 'You must call run() or attach_to_existing_process()')
961 contents = _drone_manager.get_pidfile_contents(
962 self.pidfile_id, use_second_read=use_second_read)
963 if contents.is_invalid():
964 self._state = drone_manager.PidfileContents()
965 raise self._PidfileException(contents)
966 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +0000967
968
showard21baa452008-10-21 00:08:39 +0000969 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +0000970 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
971 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +0000972 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +0000973 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +0000974
975
976 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +0000977 if self.lost_process:
showard21baa452008-10-21 00:08:39 +0000978 return
mblighbb421852008-03-11 22:36:16 +0000979
showard21baa452008-10-21 00:08:39 +0000980 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +0000981
showard170873e2009-01-07 00:22:26 +0000982 if self._state.process is None:
983 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +0000984 return
mbligh90a549d2008-03-25 23:52:34 +0000985
showard21baa452008-10-21 00:08:39 +0000986 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000987 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +0000988 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +0000989 return
mbligh90a549d2008-03-25 23:52:34 +0000990
showard170873e2009-01-07 00:22:26 +0000991 # pid but no running process - maybe process *just* exited
992 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +0000993 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000994 # autoserv exited without writing an exit code
995 # to the pidfile
showard21baa452008-10-21 00:08:39 +0000996 self._handle_pidfile_error(
997 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +0000998
showard21baa452008-10-21 00:08:39 +0000999
1000 def _get_pidfile_info(self):
1001 """\
1002 After completion, self._state will contain:
1003 pid=None, exit_status=None if autoserv has not yet run
1004 pid!=None, exit_status=None if autoserv is running
1005 pid!=None, exit_status!=None if autoserv has completed
1006 """
1007 try:
1008 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001009 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001010 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001011
1012
showard170873e2009-01-07 00:22:26 +00001013 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001014 """\
1015 Called when no pidfile is found or no pid is in the pidfile.
1016 """
showard170873e2009-01-07 00:22:26 +00001017 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001018 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001019 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001020 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001021 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001022
1023
showard35162b02009-03-03 02:17:30 +00001024 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001025 """\
1026 Called when autoserv has exited without writing an exit status,
1027 or we've timed out waiting for autoserv to write a pid to the
1028 pidfile. In either case, we just return failure and the caller
1029 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001030
showard170873e2009-01-07 00:22:26 +00001031 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001032 """
1033 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001034 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001035 self._state.exit_status = 1
1036 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001037
1038
jadmanski0afbb632008-06-06 21:10:57 +00001039 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001040 self._get_pidfile_info()
1041 return self._state.exit_status
1042
1043
1044 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001045 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001046 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001047 if self._state.num_tests_failed is None:
1048 return -1
showard21baa452008-10-21 00:08:39 +00001049 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001050
1051
showardcdaeae82009-08-31 18:32:48 +00001052 def try_copy_results_on_drone(self, **kwargs):
1053 if self.has_process():
1054 # copy results logs into the normal place for job results
1055 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1056
1057
1058 def try_copy_to_results_repository(self, source, **kwargs):
1059 if self.has_process():
1060 _drone_manager.copy_to_results_repository(self.get_process(),
1061 source, **kwargs)
1062
1063
mbligh36768f02008-02-22 18:28:33 +00001064class Agent(object):
showard77182562009-06-10 00:16:05 +00001065 """
showard8cc058f2009-09-08 16:26:33 +00001066 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001067
1068 The following methods are required on all task objects:
1069 poll() - Called periodically to let the task check its status and
1070 update its internal state. If the task succeeded.
1071 is_done() - Returns True if the task is finished.
1072 abort() - Called when an abort has been requested. The task must
1073 set its aborted attribute to True if it actually aborted.
1074
1075 The following attributes are required on all task objects:
1076 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001077 success - bool, True if this task succeeded.
1078 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1079 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001080 """
1081
1082
showard418785b2009-11-23 20:19:59 +00001083 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001084 """
showard8cc058f2009-09-08 16:26:33 +00001085 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001086 """
showard8cc058f2009-09-08 16:26:33 +00001087 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001088
showard77182562009-06-10 00:16:05 +00001089 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001090 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001091
showard8cc058f2009-09-08 16:26:33 +00001092 self.queue_entry_ids = task.queue_entry_ids
1093 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001094
showard8cc058f2009-09-08 16:26:33 +00001095 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001096 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001097
1098
jadmanski0afbb632008-06-06 21:10:57 +00001099 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001100 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001101 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001102 self.task.poll()
1103 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001104 self.finished = True
showardec113162008-05-08 00:52:49 +00001105
1106
jadmanski0afbb632008-06-06 21:10:57 +00001107 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001108 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001109
1110
showardd3dc1992009-04-22 21:01:40 +00001111 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001112 if self.task:
1113 self.task.abort()
1114 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001115 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001116 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001117
showardd3dc1992009-04-22 21:01:40 +00001118
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001119class BaseAgentTask(object):
showardd1195652009-12-08 22:21:02 +00001120 class _NullMonitor(object):
1121 pidfile_id = None
1122
1123 def has_process(self):
1124 return True
1125
1126
1127 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001128 """
showardd1195652009-12-08 22:21:02 +00001129 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001130 """
jadmanski0afbb632008-06-06 21:10:57 +00001131 self.done = False
showardd1195652009-12-08 22:21:02 +00001132 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001133 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001134 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001135 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001136 self.queue_entry_ids = []
1137 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001138 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001139
1140
1141 def _set_ids(self, host=None, queue_entries=None):
1142 if queue_entries and queue_entries != [None]:
1143 self.host_ids = [entry.host.id for entry in queue_entries]
1144 self.queue_entry_ids = [entry.id for entry in queue_entries]
1145 else:
1146 assert host
1147 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001148
1149
jadmanski0afbb632008-06-06 21:10:57 +00001150 def poll(self):
showard08a36412009-05-05 01:01:13 +00001151 if not self.started:
1152 self.start()
showardd1195652009-12-08 22:21:02 +00001153 if not self.done:
1154 self.tick()
showard08a36412009-05-05 01:01:13 +00001155
1156
1157 def tick(self):
showardd1195652009-12-08 22:21:02 +00001158 assert self.monitor
1159 exit_code = self.monitor.exit_code()
1160 if exit_code is None:
1161 return
mbligh36768f02008-02-22 18:28:33 +00001162
showardd1195652009-12-08 22:21:02 +00001163 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001164 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001165
1166
jadmanski0afbb632008-06-06 21:10:57 +00001167 def is_done(self):
1168 return self.done
mbligh36768f02008-02-22 18:28:33 +00001169
1170
jadmanski0afbb632008-06-06 21:10:57 +00001171 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001172 if self.done:
showardd1195652009-12-08 22:21:02 +00001173 assert self.started
showard08a36412009-05-05 01:01:13 +00001174 return
showardd1195652009-12-08 22:21:02 +00001175 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001176 self.done = True
1177 self.success = success
1178 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001179
1180
jadmanski0afbb632008-06-06 21:10:57 +00001181 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001182 """
1183 To be overridden.
1184 """
showarded2afea2009-07-07 20:54:07 +00001185 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001186 self.register_necessary_pidfiles()
1187
1188
1189 def _log_file(self):
1190 if not self._log_file_name:
1191 return None
1192 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001193
mbligh36768f02008-02-22 18:28:33 +00001194
jadmanski0afbb632008-06-06 21:10:57 +00001195 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001196 log_file = self._log_file()
1197 if self.monitor and log_file:
1198 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001199
1200
jadmanski0afbb632008-06-06 21:10:57 +00001201 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001202 """
1203 To be overridden.
1204 """
jadmanski0afbb632008-06-06 21:10:57 +00001205 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001206 logging.info("%s finished with success=%s", type(self).__name__,
1207 self.success)
1208
mbligh36768f02008-02-22 18:28:33 +00001209
1210
jadmanski0afbb632008-06-06 21:10:57 +00001211 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001212 if not self.started:
1213 self.prolog()
1214 self.run()
1215
1216 self.started = True
1217
1218
1219 def abort(self):
1220 if self.monitor:
1221 self.monitor.kill()
1222 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001223 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001224 self.cleanup()
1225
1226
showarded2afea2009-07-07 20:54:07 +00001227 def _get_consistent_execution_path(self, execution_entries):
1228 first_execution_path = execution_entries[0].execution_path()
1229 for execution_entry in execution_entries[1:]:
1230 assert execution_entry.execution_path() == first_execution_path, (
1231 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1232 execution_entry,
1233 first_execution_path,
1234 execution_entries[0]))
1235 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001236
1237
showarded2afea2009-07-07 20:54:07 +00001238 def _copy_results(self, execution_entries, use_monitor=None):
1239 """
1240 @param execution_entries: list of objects with execution_path() method
1241 """
showard6d1c1432009-08-20 23:30:39 +00001242 if use_monitor is not None and not use_monitor.has_process():
1243 return
1244
showarded2afea2009-07-07 20:54:07 +00001245 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001246 if use_monitor is None:
1247 assert self.monitor
1248 use_monitor = self.monitor
1249 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001250 execution_path = self._get_consistent_execution_path(execution_entries)
1251 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001252 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001253
showarda1e74b32009-05-12 17:32:04 +00001254
1255 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001256 for queue_entry in queue_entries:
1257 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001258
1259
mbligh4608b002010-01-05 18:22:35 +00001260 def _archive_results(self, queue_entries):
1261 for queue_entry in queue_entries:
1262 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001263
1264
showardd1195652009-12-08 22:21:02 +00001265 def _command_line(self):
1266 """
1267 Return the command line to run. Must be overridden.
1268 """
1269 raise NotImplementedError
1270
1271
1272 @property
1273 def num_processes(self):
1274 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001275 Return the number of processes forked by this BaseAgentTask's process.
1276 It may only be approximate. To be overridden if necessary.
showardd1195652009-12-08 22:21:02 +00001277 """
1278 return 1
1279
1280
1281 def _paired_with_monitor(self):
1282 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001283 If this BaseAgentTask's process must run on the same machine as some
showardd1195652009-12-08 22:21:02 +00001284 previous process, this method should be overridden to return a
1285 PidfileRunMonitor for that process.
1286 """
1287 return self._NullMonitor()
1288
1289
1290 @property
1291 def owner_username(self):
1292 """
1293 Return login of user responsible for this task. May be None. Must be
1294 overridden.
1295 """
1296 raise NotImplementedError
1297
1298
1299 def _working_directory(self):
1300 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001301 Return the directory where this BaseAgentTask's process executes.
1302 Must be overridden.
showardd1195652009-12-08 22:21:02 +00001303 """
1304 raise NotImplementedError
1305
1306
1307 def _pidfile_name(self):
1308 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001309 Return the name of the pidfile this BaseAgentTask's process uses. To be
showardd1195652009-12-08 22:21:02 +00001310 overridden if necessary.
1311 """
jamesrenc44ae992010-02-19 00:12:54 +00001312 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001313
1314
1315 def _check_paired_results_exist(self):
1316 if not self._paired_with_monitor().has_process():
1317 email_manager.manager.enqueue_notify_email(
1318 'No paired results in task',
1319 'No paired results in task %s at %s'
1320 % (self, self._paired_with_monitor().pidfile_id))
1321 self.finished(False)
1322 return False
1323 return True
1324
1325
1326 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001327 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001328 self.monitor = PidfileRunMonitor()
1329
1330
1331 def run(self):
1332 if not self._check_paired_results_exist():
1333 return
1334
1335 self._create_monitor()
1336 self.monitor.run(
1337 self._command_line(), self._working_directory(),
1338 num_processes=self.num_processes,
1339 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1340 pidfile_name=self._pidfile_name(),
1341 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001342 username=self.owner_username,
1343 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1344
1345
1346 def get_drone_hostnames_allowed(self):
1347 if not models.DroneSet.drone_sets_enabled():
1348 return None
1349
1350 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1351 if not hqes:
1352 # Only special tasks could be missing host queue entries
1353 assert isinstance(self, SpecialAgentTask)
1354 return self._user_or_global_default_drone_set(
1355 self.task, self.task.requested_by)
1356
1357 job_ids = hqes.values_list('job', flat=True).distinct()
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001358 assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
jamesren76fcf192010-04-21 20:39:50 +00001359 "span multiple jobs")
1360
1361 job = models.Job.objects.get(id=job_ids[0])
1362 drone_set = job.drone_set
1363 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001364 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001365
1366 return drone_set.get_drone_hostnames()
1367
1368
1369 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1370 """
1371 Returns the user's default drone set, if present.
1372
1373 Otherwise, returns the global default drone set.
1374 """
1375 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1376 if not user:
1377 logging.warn('%s had no owner; using default drone set',
1378 obj_with_owner)
1379 return default_hostnames
1380 if not user.drone_set:
1381 logging.warn('User %s has no default drone set, using global '
1382 'default', user.login)
1383 return default_hostnames
1384 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001385
1386
1387 def register_necessary_pidfiles(self):
1388 pidfile_id = _drone_manager.get_pidfile_id_from(
1389 self._working_directory(), self._pidfile_name())
1390 _drone_manager.register_pidfile(pidfile_id)
1391
1392 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1393 if paired_pidfile_id:
1394 _drone_manager.register_pidfile(paired_pidfile_id)
1395
1396
1397 def recover(self):
1398 if not self._check_paired_results_exist():
1399 return
1400
1401 self._create_monitor()
1402 self.monitor.attach_to_existing_process(
1403 self._working_directory(), pidfile_name=self._pidfile_name(),
1404 num_processes=self.num_processes)
1405 if not self.monitor.has_process():
1406 # no process to recover; wait to be started normally
1407 self.monitor = None
1408 return
1409
1410 self.started = True
Aviv Keshet225bdfe2013-03-05 10:10:08 -08001411 logging.info('Recovering process %s for %s at %s',
1412 self.monitor.get_process(), type(self).__name__,
1413 self._working_directory())
mbligh36768f02008-02-22 18:28:33 +00001414
1415
mbligh4608b002010-01-05 18:22:35 +00001416 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1417 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001418 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001419 for entry in queue_entries:
1420 if entry.status not in allowed_hqe_statuses:
Dale Curtisaa513362011-03-01 17:27:44 -08001421 raise host_scheduler.SchedulerError(
1422 '%s attempting to start entry with invalid status %s: '
1423 '%s' % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001424 invalid_host_status = (
1425 allowed_host_statuses is not None
1426 and entry.host.status not in allowed_host_statuses)
1427 if invalid_host_status:
Dale Curtisaa513362011-03-01 17:27:44 -08001428 raise host_scheduler.SchedulerError(
1429 '%s attempting to start on queue entry with invalid '
1430 'host status %s: %s'
1431 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001432
1433
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001434SiteAgentTask = utils.import_site_class(
1435 __file__, 'autotest_lib.scheduler.site_monitor_db',
1436 'SiteAgentTask', BaseAgentTask)
1437
1438class AgentTask(SiteAgentTask):
1439 pass
1440
1441
showardd9205182009-04-27 20:09:55 +00001442class TaskWithJobKeyvals(object):
1443 """AgentTask mixin providing functionality to help with job keyval files."""
1444 _KEYVAL_FILE = 'keyval'
1445 def _format_keyval(self, key, value):
1446 return '%s=%s' % (key, value)
1447
1448
1449 def _keyval_path(self):
1450 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001451 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001452
1453
1454 def _write_keyval_after_job(self, field, value):
1455 assert self.monitor
1456 if not self.monitor.has_process():
1457 return
1458 _drone_manager.write_lines_to_file(
1459 self._keyval_path(), [self._format_keyval(field, value)],
1460 paired_with_process=self.monitor.get_process())
1461
1462
1463 def _job_queued_keyval(self, job):
1464 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1465
1466
1467 def _write_job_finished(self):
1468 self._write_keyval_after_job("job_finished", int(time.time()))
1469
1470
showarddb502762009-09-09 15:31:20 +00001471 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1472 keyval_contents = '\n'.join(self._format_keyval(key, value)
1473 for key, value in keyval_dict.iteritems())
1474 # always end with a newline to allow additional keyvals to be written
1475 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001476 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001477 keyval_contents,
1478 file_path=keyval_path)
1479
1480
1481 def _write_keyvals_before_job(self, keyval_dict):
1482 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1483
1484
1485 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001486 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001487 host.hostname)
1488 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001489 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001490 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1491 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1492
1493
showard8cc058f2009-09-08 16:26:33 +00001494class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001495 """
1496 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1497 """
1498
1499 TASK_TYPE = None
1500 host = None
1501 queue_entry = None
1502
showardd1195652009-12-08 22:21:02 +00001503 def __init__(self, task, extra_command_args):
1504 super(SpecialAgentTask, self).__init__()
1505
lmrb7c5d272010-04-16 06:34:04 +00001506 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001507
jamesrenc44ae992010-02-19 00:12:54 +00001508 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001509 self.queue_entry = None
1510 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001511 self.queue_entry = scheduler_models.HostQueueEntry(
1512 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001513
showarded2afea2009-07-07 20:54:07 +00001514 self.task = task
1515 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001516
1517
showard8cc058f2009-09-08 16:26:33 +00001518 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001519 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1520
1521
1522 def _command_line(self):
1523 return _autoserv_command_line(self.host.hostname,
1524 self._extra_command_args,
1525 queue_entry=self.queue_entry)
1526
1527
1528 def _working_directory(self):
1529 return self.task.execution_path()
1530
1531
1532 @property
1533 def owner_username(self):
1534 if self.task.requested_by:
1535 return self.task.requested_by.login
1536 return None
showard8cc058f2009-09-08 16:26:33 +00001537
1538
showarded2afea2009-07-07 20:54:07 +00001539 def prolog(self):
1540 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001541 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001542 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001543
1544
showardde634ee2009-01-30 01:44:24 +00001545 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001546 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001547
showard2fe3f1d2009-07-06 20:19:11 +00001548 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001549 return # don't fail metahost entries, they'll be reassigned
1550
showard2fe3f1d2009-07-06 20:19:11 +00001551 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001552 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001553 return # entry has been aborted
1554
showard2fe3f1d2009-07-06 20:19:11 +00001555 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001556 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001557 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001558 self._write_keyval_after_job(queued_key, queued_time)
1559 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001560
showard8cc058f2009-09-08 16:26:33 +00001561 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001562 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001563 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001564 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001565
showard8cc058f2009-09-08 16:26:33 +00001566 pidfile_id = _drone_manager.get_pidfile_id_from(
1567 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001568 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001569 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001570
1571 if self.queue_entry.job.parse_failed_repair:
1572 self._parse_results([self.queue_entry])
1573 else:
1574 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001575
Alex Miller23676a22013-07-03 09:03:36 -07001576 # Also fail all other special tasks that have not yet run for this HQE
1577 pending_tasks = models.SpecialTask.objects.filter(
1578 queue_entry__id=self.queue_entry.id,
1579 is_complete=0)
1580 if pending_tasks:
1581 for task in pending_tasks:
1582 task.finish(False)
1583
showard8cc058f2009-09-08 16:26:33 +00001584
1585 def cleanup(self):
1586 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001587
1588 # We will consider an aborted task to be "Failed"
1589 self.task.finish(bool(self.success))
1590
showardf85a0b72009-10-07 20:48:45 +00001591 if self.monitor:
1592 if self.monitor.has_process():
1593 self._copy_results([self.task])
1594 if self.monitor.pidfile_id is not None:
1595 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001596
1597
1598class RepairTask(SpecialAgentTask):
1599 TASK_TYPE = models.SpecialTask.Task.REPAIR
1600
1601
showardd1195652009-12-08 22:21:02 +00001602 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001603 """\
1604 queue_entry: queue entry to mark failed if this repair fails.
1605 """
1606 protection = host_protections.Protection.get_string(
1607 task.host.protection)
1608 # normalize the protection name
1609 protection = host_protections.Protection.get_attr_name(protection)
1610
1611 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001612 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001613
1614 # *don't* include the queue entry in IDs -- if the queue entry is
1615 # aborted, we want to leave the repair task running
1616 self._set_ids(host=self.host)
1617
1618
1619 def prolog(self):
1620 super(RepairTask, self).prolog()
1621 logging.info("repair_task starting")
1622 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001623
1624
jadmanski0afbb632008-06-06 21:10:57 +00001625 def epilog(self):
1626 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001627
jadmanski0afbb632008-06-06 21:10:57 +00001628 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001629 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001630 else:
showard8cc058f2009-09-08 16:26:33 +00001631 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001632 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001633 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001634
1635
showarded2afea2009-07-07 20:54:07 +00001636class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001637 def _copy_to_results_repository(self):
1638 if not self.queue_entry or self.queue_entry.meta_host:
1639 return
1640
1641 self.queue_entry.set_execution_subdir()
1642 log_name = os.path.basename(self.task.execution_path())
1643 source = os.path.join(self.task.execution_path(), 'debug',
1644 'autoserv.DEBUG')
1645 destination = os.path.join(
1646 self.queue_entry.execution_path(), log_name)
1647
1648 self.monitor.try_copy_to_results_repository(
1649 source, destination_path=destination)
1650
1651
showard170873e2009-01-07 00:22:26 +00001652 def epilog(self):
1653 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001654
showard775300b2009-09-09 15:30:50 +00001655 if self.success:
1656 return
showard8fe93b52008-11-18 17:53:22 +00001657
showard775300b2009-09-09 15:30:50 +00001658 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001659
showard775300b2009-09-09 15:30:50 +00001660 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001661 # effectively ignore failure for these hosts
1662 self.success = True
showard775300b2009-09-09 15:30:50 +00001663 return
1664
1665 if self.queue_entry:
1666 self.queue_entry.requeue()
1667
1668 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001669 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001670 queue_entry__id=self.queue_entry.id):
1671 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1672 self._fail_queue_entry()
1673 return
1674
showard9bb960b2009-11-19 01:02:11 +00001675 queue_entry = models.HostQueueEntry.objects.get(
1676 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001677 else:
1678 queue_entry = None
1679
1680 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001681 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001682 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001683 queue_entry=queue_entry,
1684 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001685
showard8fe93b52008-11-18 17:53:22 +00001686
Alex Miller42437f92013-05-28 12:58:54 -07001687 def _should_pending(self):
1688 """
1689 Decide if we should call the host queue entry's on_pending method.
1690 We should if:
1691 1) There exists an associated host queue entry.
1692 2) The current special task completed successfully.
1693 3) There do not exist any more special tasks to be run before the
1694 host queue entry starts.
1695
1696 @returns: True if we should call pending, false if not.
1697
1698 """
1699 if not self.queue_entry or not self.success:
1700 return False
1701
1702 # We know if this is the last one when we create it, so we could add
1703 # another column to the database to keep track of this information, but
1704 # I expect the overhead of querying here to be minimal.
1705 queue_entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1706 queued = models.SpecialTask.objects.filter(
1707 host__id=self.host.id, is_active=False,
1708 is_complete=False, queue_entry=queue_entry)
1709 queued = queued.exclude(id=self.task.id)
1710 return queued.count() == 0
1711
1712
showard8fe93b52008-11-18 17:53:22 +00001713class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001714 TASK_TYPE = models.SpecialTask.Task.VERIFY
1715
1716
showardd1195652009-12-08 22:21:02 +00001717 def __init__(self, task):
1718 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001719 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001720
1721
jadmanski0afbb632008-06-06 21:10:57 +00001722 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001723 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001724
showardb18134f2009-03-20 20:52:18 +00001725 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001726 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001727 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1728 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001729
jamesren42318f72010-05-10 23:40:59 +00001730 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00001731 # and there's no need to keep records of other requests.
1732 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001733 host__id=self.host.id,
1734 task=models.SpecialTask.Task.VERIFY,
jamesren42318f72010-05-10 23:40:59 +00001735 is_active=False, is_complete=False, queue_entry=None)
showarded2afea2009-07-07 20:54:07 +00001736 queued_verifies = queued_verifies.exclude(id=self.task.id)
1737 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001738
mbligh36768f02008-02-22 18:28:33 +00001739
jadmanski0afbb632008-06-06 21:10:57 +00001740 def epilog(self):
1741 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001742 if self.success:
Alex Miller42437f92013-05-28 12:58:54 -07001743 if self._should_pending():
showard8cc058f2009-09-08 16:26:33 +00001744 self.queue_entry.on_pending()
1745 else:
1746 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001747
1748
mbligh4608b002010-01-05 18:22:35 +00001749class CleanupTask(PreJobTask):
1750 # note this can also run post-job, but when it does, it's running standalone
1751 # against the host (not related to the job), so it's not considered a
1752 # PostJobTask
1753
1754 TASK_TYPE = models.SpecialTask.Task.CLEANUP
1755
1756
1757 def __init__(self, task, recover_run_monitor=None):
1758 super(CleanupTask, self).__init__(task, ['--cleanup'])
1759 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1760
1761
1762 def prolog(self):
1763 super(CleanupTask, self).prolog()
1764 logging.info("starting cleanup task for host: %s", self.host.hostname)
1765 self.host.set_status(models.Host.Status.CLEANING)
1766 if self.queue_entry:
1767 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1768
1769
1770 def _finish_epilog(self):
1771 if not self.queue_entry or not self.success:
1772 return
1773
1774 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
1775 should_run_verify = (
1776 self.queue_entry.job.run_verify
1777 and self.host.protection != do_not_verify_protection)
1778 if should_run_verify:
1779 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1780 models.SpecialTask.objects.create(
1781 host=models.Host.objects.get(id=self.host.id),
1782 queue_entry=entry,
1783 task=models.SpecialTask.Task.VERIFY)
1784 else:
Alex Miller42437f92013-05-28 12:58:54 -07001785 if self._should_pending():
1786 self.queue_entry.on_pending()
mbligh4608b002010-01-05 18:22:35 +00001787
1788
1789 def epilog(self):
1790 super(CleanupTask, self).epilog()
1791
1792 if self.success:
1793 self.host.update_field('dirty', 0)
1794 self.host.set_status(models.Host.Status.READY)
1795
1796 self._finish_epilog()
1797
1798
showarda9545c02009-12-18 22:44:26 +00001799class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
1800 """
1801 Common functionality for QueueTask and HostlessQueueTask
1802 """
1803 def __init__(self, queue_entries):
1804 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001805 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001806 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001807
1808
showard73ec0442009-02-07 02:05:20 +00001809 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001810 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001811
1812
jamesrenc44ae992010-02-19 00:12:54 +00001813 def _write_control_file(self, execution_path):
1814 control_path = _drone_manager.attach_file_to_execution(
1815 execution_path, self.job.control_file)
1816 return control_path
1817
1818
Aviv Keshet308e7362013-05-21 14:43:16 -07001819 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001820 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001821 execution_path = self.queue_entries[0].execution_path()
1822 control_path = self._write_control_file(execution_path)
1823 hostnames = ','.join(entry.host.hostname
1824 for entry in self.queue_entries
1825 if not entry.is_hostless())
1826
1827 execution_tag = self.queue_entries[0].execution_tag()
1828 params = _autoserv_command_line(
1829 hostnames,
beepscb6f1e22013-06-28 19:14:10 -07001830 ['-P', execution_tag, '-n', '--verify_job_repo_url',
jamesrenc44ae992010-02-19 00:12:54 +00001831 _drone_manager.absolute_path(control_path)],
1832 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001833 if self.job.is_image_update_job():
1834 params += ['--image', self.job.update_image_path]
1835
jamesrenc44ae992010-02-19 00:12:54 +00001836 return params
showardd1195652009-12-08 22:21:02 +00001837
1838
1839 @property
1840 def num_processes(self):
1841 return len(self.queue_entries)
1842
1843
1844 @property
1845 def owner_username(self):
1846 return self.job.owner
1847
1848
1849 def _working_directory(self):
1850 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001851
1852
jadmanski0afbb632008-06-06 21:10:57 +00001853 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001854 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001855 keyval_dict = self.job.keyval_dict()
1856 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001857 group_name = self.queue_entries[0].get_group_name()
1858 if group_name:
1859 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001860 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001861 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001862 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001863 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001864
1865
showard35162b02009-03-03 02:17:30 +00001866 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001867 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001868 _drone_manager.write_lines_to_file(error_file_path,
1869 [_LOST_PROCESS_ERROR])
1870
1871
showardd3dc1992009-04-22 21:01:40 +00001872 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001873 if not self.monitor:
1874 return
1875
showardd9205182009-04-27 20:09:55 +00001876 self._write_job_finished()
1877
showard35162b02009-03-03 02:17:30 +00001878 if self.monitor.lost_process:
1879 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001880
jadmanskif7fa2cc2008-10-01 14:13:23 +00001881
showardcbd74612008-11-19 21:42:02 +00001882 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001883 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001884 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001885 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001886 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001887
1888
jadmanskif7fa2cc2008-10-01 14:13:23 +00001889 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001890 if not self.monitor or not self.monitor.has_process():
1891 return
1892
jadmanskif7fa2cc2008-10-01 14:13:23 +00001893 # build up sets of all the aborted_by and aborted_on values
1894 aborted_by, aborted_on = set(), set()
1895 for queue_entry in self.queue_entries:
1896 if queue_entry.aborted_by:
1897 aborted_by.add(queue_entry.aborted_by)
1898 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1899 aborted_on.add(t)
1900
1901 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001902 # TODO(showard): this conditional is now obsolete, we just need to leave
1903 # it in temporarily for backwards compatibility over upgrades. delete
1904 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001905 assert len(aborted_by) <= 1
1906 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001907 aborted_by_value = aborted_by.pop()
1908 aborted_on_value = max(aborted_on)
1909 else:
1910 aborted_by_value = 'autotest_system'
1911 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001912
showarda0382352009-02-11 23:36:43 +00001913 self._write_keyval_after_job("aborted_by", aborted_by_value)
1914 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001915
showardcbd74612008-11-19 21:42:02 +00001916 aborted_on_string = str(datetime.datetime.fromtimestamp(
1917 aborted_on_value))
1918 self._write_status_comment('Job aborted by %s on %s' %
1919 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001920
1921
jadmanski0afbb632008-06-06 21:10:57 +00001922 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001923 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001924 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001925 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001926
1927
jadmanski0afbb632008-06-06 21:10:57 +00001928 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001929 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001930 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001931
1932
1933class QueueTask(AbstractQueueTask):
1934 def __init__(self, queue_entries):
1935 super(QueueTask, self).__init__(queue_entries)
1936 self._set_ids(queue_entries=queue_entries)
1937
1938
1939 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001940 self._check_queue_entry_statuses(
1941 self.queue_entries,
1942 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1943 models.HostQueueEntry.Status.RUNNING),
1944 allowed_host_statuses=(models.Host.Status.PENDING,
1945 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001946
1947 super(QueueTask, self).prolog()
1948
1949 for queue_entry in self.queue_entries:
1950 self._write_host_keyvals(queue_entry.host)
1951 queue_entry.host.set_status(models.Host.Status.RUNNING)
1952 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001953
1954
1955 def _finish_task(self):
1956 super(QueueTask, self)._finish_task()
1957
1958 for queue_entry in self.queue_entries:
1959 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001960 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001961
1962
mbligh4608b002010-01-05 18:22:35 +00001963class HostlessQueueTask(AbstractQueueTask):
1964 def __init__(self, queue_entry):
1965 super(HostlessQueueTask, self).__init__([queue_entry])
1966 self.queue_entry_ids = [queue_entry.id]
1967
1968
1969 def prolog(self):
1970 self.queue_entries[0].update_field('execution_subdir', 'hostless')
1971 super(HostlessQueueTask, self).prolog()
1972
1973
mbligh4608b002010-01-05 18:22:35 +00001974 def _finish_task(self):
1975 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00001976 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001977
1978
showardd3dc1992009-04-22 21:01:40 +00001979class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00001980 def __init__(self, queue_entries, log_file_name):
1981 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00001982
showardd1195652009-12-08 22:21:02 +00001983 self.queue_entries = queue_entries
1984
showardd3dc1992009-04-22 21:01:40 +00001985 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00001986 self._autoserv_monitor.attach_to_existing_process(
1987 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00001988
showardd1195652009-12-08 22:21:02 +00001989
1990 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00001991 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00001992 return 'true'
1993 return self._generate_command(
1994 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00001995
1996
1997 def _generate_command(self, results_dir):
1998 raise NotImplementedError('Subclasses must override this')
1999
2000
showardd1195652009-12-08 22:21:02 +00002001 @property
2002 def owner_username(self):
2003 return self.queue_entries[0].job.owner
2004
2005
2006 def _working_directory(self):
2007 return self._get_consistent_execution_path(self.queue_entries)
2008
2009
2010 def _paired_with_monitor(self):
2011 return self._autoserv_monitor
2012
2013
showardd3dc1992009-04-22 21:01:40 +00002014 def _job_was_aborted(self):
2015 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002016 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002017 queue_entry.update_from_database()
2018 if was_aborted is None: # first queue entry
2019 was_aborted = bool(queue_entry.aborted)
2020 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00002021 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
2022 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00002023 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00002024 'Inconsistent abort state',
2025 'Queue entries have inconsistent abort state:\n' +
2026 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00002027 # don't crash here, just assume true
2028 return True
2029 return was_aborted
2030
2031
showardd1195652009-12-08 22:21:02 +00002032 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002033 if self._job_was_aborted():
2034 return models.HostQueueEntry.Status.ABORTED
2035
2036 # we'll use a PidfileRunMonitor to read the autoserv exit status
2037 if self._autoserv_monitor.exit_code() == 0:
2038 return models.HostQueueEntry.Status.COMPLETED
2039 return models.HostQueueEntry.Status.FAILED
2040
2041
showardd3dc1992009-04-22 21:01:40 +00002042 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002043 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002044 queue_entry.set_status(status)
2045
2046
2047 def abort(self):
2048 # override AgentTask.abort() to avoid killing the process and ending
2049 # the task. post-job tasks continue when the job is aborted.
2050 pass
2051
2052
mbligh4608b002010-01-05 18:22:35 +00002053 def _pidfile_label(self):
2054 # '.autoserv_execute' -> 'autoserv'
2055 return self._pidfile_name()[1:-len('_execute')]
2056
2057
showard9bb960b2009-11-19 01:02:11 +00002058class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002059 """
2060 Task responsible for
2061 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2062 * copying logs to the results repository
2063 * spawning CleanupTasks for hosts, if necessary
2064 * spawning a FinalReparseTask for the job
2065 """
showardd1195652009-12-08 22:21:02 +00002066 def __init__(self, queue_entries, recover_run_monitor=None):
2067 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002068 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002069 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002070 self._set_ids(queue_entries=queue_entries)
2071
2072
Aviv Keshet308e7362013-05-21 14:43:16 -07002073 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd3dc1992009-04-22 21:01:40 +00002074 def _generate_command(self, results_dir):
2075 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002076 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002077 return [_autoserv_path , '-p',
2078 '--pidfile-label=%s' % self._pidfile_label(),
2079 '--use-existing-results', '--collect-crashinfo',
2080 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002081
2082
showardd1195652009-12-08 22:21:02 +00002083 @property
2084 def num_processes(self):
2085 return len(self.queue_entries)
2086
2087
2088 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002089 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002090
2091
showardd3dc1992009-04-22 21:01:40 +00002092 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002093 self._check_queue_entry_statuses(
2094 self.queue_entries,
2095 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2096 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002097
showardd3dc1992009-04-22 21:01:40 +00002098 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002099
2100
showardd3dc1992009-04-22 21:01:40 +00002101 def epilog(self):
2102 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002103 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002104 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002105
showard9bb960b2009-11-19 01:02:11 +00002106
2107 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002108 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002109 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002110 models.HostQueueEntry.Status.COMPLETED)
2111 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2112 else:
2113 final_success = False
2114 num_tests_failed = 0
2115
showard9bb960b2009-11-19 01:02:11 +00002116 reboot_after = self._job.reboot_after
2117 do_reboot = (
2118 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002119 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002120 or reboot_after == model_attributes.RebootAfter.ALWAYS
2121 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
showard9bb960b2009-11-19 01:02:11 +00002122 and final_success and num_tests_failed == 0))
2123
showardd1195652009-12-08 22:21:02 +00002124 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002125 if do_reboot:
2126 # don't pass the queue entry to the CleanupTask. if the cleanup
2127 # fails, the job doesn't care -- it's over.
2128 models.SpecialTask.objects.create(
2129 host=models.Host.objects.get(id=queue_entry.host.id),
2130 task=models.SpecialTask.Task.CLEANUP,
2131 requested_by=self._job.owner_model())
2132 else:
2133 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002134
2135
showard0bbfc212009-04-29 21:06:13 +00002136 def run(self):
showard597bfd32009-05-08 18:22:50 +00002137 autoserv_exit_code = self._autoserv_monitor.exit_code()
2138 # only run if Autoserv exited due to some signal. if we have no exit
2139 # code, assume something bad (and signal-like) happened.
2140 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002141 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002142 else:
2143 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002144
2145
mbligh4608b002010-01-05 18:22:35 +00002146class SelfThrottledPostJobTask(PostJobTask):
2147 """
2148 Special AgentTask subclass that maintains its own global process limit.
2149 """
2150 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002151
2152
mbligh4608b002010-01-05 18:22:35 +00002153 @classmethod
2154 def _increment_running_processes(cls):
2155 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002156
mblighd5c95802008-03-05 00:33:46 +00002157
mbligh4608b002010-01-05 18:22:35 +00002158 @classmethod
2159 def _decrement_running_processes(cls):
2160 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002161
2162
mbligh4608b002010-01-05 18:22:35 +00002163 @classmethod
2164 def _max_processes(cls):
2165 raise NotImplementedError
2166
2167
2168 @classmethod
2169 def _can_run_new_process(cls):
2170 return cls._num_running_processes < cls._max_processes()
2171
2172
2173 def _process_started(self):
2174 return bool(self.monitor)
2175
2176
2177 def tick(self):
2178 # override tick to keep trying to start until the process count goes
2179 # down and we can, at which point we revert to default behavior
2180 if self._process_started():
2181 super(SelfThrottledPostJobTask, self).tick()
2182 else:
2183 self._try_starting_process()
2184
2185
2186 def run(self):
2187 # override run() to not actually run unless we can
2188 self._try_starting_process()
2189
2190
2191 def _try_starting_process(self):
2192 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002193 return
2194
mbligh4608b002010-01-05 18:22:35 +00002195 # actually run the command
2196 super(SelfThrottledPostJobTask, self).run()
jamesren25663562010-04-27 18:00:55 +00002197 if self._process_started():
2198 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002199
mblighd5c95802008-03-05 00:33:46 +00002200
mbligh4608b002010-01-05 18:22:35 +00002201 def finished(self, success):
2202 super(SelfThrottledPostJobTask, self).finished(success)
2203 if self._process_started():
2204 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002205
showard21baa452008-10-21 00:08:39 +00002206
mbligh4608b002010-01-05 18:22:35 +00002207class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002208 def __init__(self, queue_entries):
2209 super(FinalReparseTask, self).__init__(queue_entries,
2210 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002211 # don't use _set_ids, since we don't want to set the host_ids
2212 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002213
2214
2215 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002216 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002217 results_dir]
2218
2219
2220 @property
2221 def num_processes(self):
2222 return 0 # don't include parser processes in accounting
2223
2224
2225 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002226 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002227
2228
showard97aed502008-11-04 02:01:24 +00002229 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002230 def _max_processes(cls):
2231 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002232
2233
2234 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002235 self._check_queue_entry_statuses(
2236 self.queue_entries,
2237 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002238
showard97aed502008-11-04 02:01:24 +00002239 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002240
2241
2242 def epilog(self):
2243 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002244 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002245
2246
mbligh4608b002010-01-05 18:22:35 +00002247class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002248 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2249
mbligh4608b002010-01-05 18:22:35 +00002250 def __init__(self, queue_entries):
2251 super(ArchiveResultsTask, self).__init__(queue_entries,
2252 log_file_name='.archiving.log')
2253 # don't use _set_ids, since we don't want to set the host_ids
2254 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002255
2256
mbligh4608b002010-01-05 18:22:35 +00002257 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002258 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002259
2260
Aviv Keshet308e7362013-05-21 14:43:16 -07002261 # TODO: Refactor into autoserv_utils. crbug.com/243090
mbligh4608b002010-01-05 18:22:35 +00002262 def _generate_command(self, results_dir):
2263 return [_autoserv_path , '-p',
2264 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002265 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002266 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2267 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002268
2269
mbligh4608b002010-01-05 18:22:35 +00002270 @classmethod
2271 def _max_processes(cls):
2272 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002273
2274
2275 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002276 self._check_queue_entry_statuses(
2277 self.queue_entries,
2278 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2279
2280 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002281
2282
mbligh4608b002010-01-05 18:22:35 +00002283 def epilog(self):
2284 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002285 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002286 failed_file = os.path.join(self._working_directory(),
2287 self._ARCHIVING_FAILED_FILE)
2288 paired_process = self._paired_with_monitor().get_process()
2289 _drone_manager.write_lines_to_file(
2290 failed_file, ['Archiving failed with exit code %s'
2291 % self.monitor.exit_code()],
2292 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002293 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002294
2295
mbligh36768f02008-02-22 18:28:33 +00002296if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002297 main()