blob: a50668ab5844957e8f1c568e86adf36cc209bdb3 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
mbligh36768f02008-02-22 18:28:33 +00008
Aviv Keshet225bdfe2013-03-05 10:10:08 -08009import datetime, optparse, os, signal
10import sys, time, traceback, urllib
11import logging, gc
showard402934a2009-12-21 22:20:47 +000012
Alex Miller05d7b4c2013-03-04 07:49:38 -080013import common
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000015
16import django.db
17
showard136e6dc2009-06-10 19:38:49 +000018from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000019from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000020from autotest_lib.database import database_connection
jamesrendd855242010-03-02 22:23:44 +000021from autotest_lib.frontend.afe import model_attributes
Alex Miller05d7b4c2013-03-04 07:49:38 -080022from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000023from autotest_lib.scheduler import drone_manager, drones, email_manager
Dale Curtisaa513362011-03-01 17:27:44 -080024from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
Alex Miller05d7b4c2013-03-04 07:49:38 -080025from autotest_lib.scheduler import scheduler_logging_config
jamesrenc44ae992010-02-19 00:12:54 +000026from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080027from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070028from autotest_lib.server import autoserv_utils
Fang Deng1d6c2a02013-04-17 15:25:45 -070029from autotest_lib.site_utils.graphite import stats
Alex Miller05d7b4c2013-03-04 07:49:38 -080030
showard549afad2009-08-20 23:33:36 +000031BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
32PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000033
mbligh36768f02008-02-22 18:28:33 +000034RESULTS_DIR = '.'
35AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000036DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000037AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
38
39if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000040 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000041AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
42AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
43
44if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000045 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000046
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
Aviv Keshet308e7362013-05-21 14:43:16 -070056_autoserv_directory = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server')
57_autoserv_path = os.path.join(_autoserv_directory, 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000059_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000060
Eric Lie0493a42010-11-15 13:05:43 -080061def _parser_path_default(install_dir):
62 return os.path.join(install_dir, 'tko', 'parse')
63_parser_path_func = utils.import_site_function(
64 __file__, 'autotest_lib.scheduler.site_monitor_db',
65 'parser_path', _parser_path_default)
66_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
67
mbligh36768f02008-02-22 18:28:33 +000068
showardec6a3b92009-09-25 20:29:13 +000069def _get_pidfile_timeout_secs():
70 """@returns How long to wait for autoserv to write pidfile."""
71 pidfile_timeout_mins = global_config.global_config.get_config_value(
72 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
73 return pidfile_timeout_mins * 60
74
75
mbligh83c1e9e2009-05-01 23:10:41 +000076def _site_init_monitor_db_dummy():
77 return {}
78
79
jamesren76fcf192010-04-21 20:39:50 +000080def _verify_default_drone_set_exists():
81 if (models.DroneSet.drone_sets_enabled() and
82 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080083 raise host_scheduler.SchedulerError(
84 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000085
86
87def _sanity_check():
88 """Make sure the configs are consistent before starting the scheduler"""
89 _verify_default_drone_set_exists()
90
91
mbligh36768f02008-02-22 18:28:33 +000092def main():
showard27f33872009-04-07 18:20:53 +000093 try:
showard549afad2009-08-20 23:33:36 +000094 try:
95 main_without_exception_handling()
96 except SystemExit:
97 raise
98 except:
99 logging.exception('Exception escaping in monitor_db')
100 raise
101 finally:
102 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000103
104
105def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000106 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000107
showard136e6dc2009-06-10 19:38:49 +0000108 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000109 parser = optparse.OptionParser(usage)
110 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
111 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000112 parser.add_option('--test', help='Indicate that scheduler is under ' +
113 'test and should use dummy autoserv and no parsing',
114 action='store_true')
115 (options, args) = parser.parse_args()
116 if len(args) != 1:
117 parser.print_usage()
118 return
mbligh36768f02008-02-22 18:28:33 +0000119
showard5613c662009-06-08 23:30:33 +0000120 scheduler_enabled = global_config.global_config.get_config_value(
121 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
122
123 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800124 logging.error("Scheduler not enabled, set enable_scheduler to true in "
125 "the global_config's SCHEDULER section to enable it. "
126 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000127 sys.exit(1)
128
jadmanski0afbb632008-06-06 21:10:57 +0000129 global RESULTS_DIR
130 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000131
mbligh83c1e9e2009-05-01 23:10:41 +0000132 site_init = utils.import_site_function(__file__,
133 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
134 _site_init_monitor_db_dummy)
135 site_init()
136
showardcca334f2009-03-12 20:38:34 +0000137 # Change the cwd while running to avoid issues incase we were launched from
138 # somewhere odd (such as a random NFS home directory of the person running
139 # sudo to launch us as the appropriate user).
140 os.chdir(RESULTS_DIR)
141
jamesrenc7d387e2010-08-10 21:48:30 +0000142 # This is helpful for debugging why stuff a scheduler launches is
143 # misbehaving.
144 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000145
jadmanski0afbb632008-06-06 21:10:57 +0000146 if options.test:
147 global _autoserv_path
148 _autoserv_path = 'autoserv_dummy'
149 global _testing_mode
150 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000151
jamesrenc44ae992010-02-19 00:12:54 +0000152 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000153 server.start()
154
jadmanski0afbb632008-06-06 21:10:57 +0000155 try:
jamesrenc44ae992010-02-19 00:12:54 +0000156 initialize()
showardc5afc462009-01-13 00:09:39 +0000157 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000158 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000159
Eric Lia82dc352011-02-23 13:15:52 -0800160 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000161 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000162 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000163 except:
showard170873e2009-01-07 00:22:26 +0000164 email_manager.manager.log_stacktrace(
165 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000166
showard170873e2009-01-07 00:22:26 +0000167 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000168 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000169 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000170 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000171
172
showard136e6dc2009-06-10 19:38:49 +0000173def setup_logging():
174 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
175 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
176 logging_manager.configure_logging(
177 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
178 logfile_name=log_name)
179
180
mbligh36768f02008-02-22 18:28:33 +0000181def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000182 global _shutdown
183 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000184 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000185
186
jamesrenc44ae992010-02-19 00:12:54 +0000187def initialize():
showardb18134f2009-03-20 20:52:18 +0000188 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
189 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000190
showard8de37132009-08-31 18:33:08 +0000191 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000192 logging.critical("monitor_db already running, aborting!")
193 sys.exit(1)
194 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000195
showardb1e51872008-10-07 11:08:18 +0000196 if _testing_mode:
197 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000198 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000199
jadmanski0afbb632008-06-06 21:10:57 +0000200 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
201 global _db
showard170873e2009-01-07 00:22:26 +0000202 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000203 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000204
showardfa8629c2008-11-04 16:51:23 +0000205 # ensure Django connection is in autocommit
206 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000207 # bypass the readonly connection
208 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000209
showardb18134f2009-03-20 20:52:18 +0000210 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000211 signal.signal(signal.SIGINT, handle_sigint)
212
jamesrenc44ae992010-02-19 00:12:54 +0000213 initialize_globals()
214 scheduler_models.initialize()
215
showardd1ee1dd2009-01-07 21:33:08 +0000216 drones = global_config.global_config.get_config_value(
217 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
218 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000219 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000220 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000221 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
222
showardb18134f2009-03-20 20:52:18 +0000223 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000224
225
jamesrenc44ae992010-02-19 00:12:54 +0000226def initialize_globals():
227 global _drone_manager
228 _drone_manager = drone_manager.instance()
229
230
showarded2afea2009-07-07 20:54:07 +0000231def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
232 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000233 """
234 @returns The autoserv command line as a list of executable + parameters.
235
236 @param machines - string - A machine or comma separated list of machines
237 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000238 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700239 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
240 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000241 @param queue_entry - A HostQueueEntry object - If supplied and no Job
242 object was supplied, this will be used to lookup the Job object.
243 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700244 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
245 machines, results_directory=drone_manager.WORKING_DIRECTORY,
246 extra_args=extra_args, job=job, queue_entry=queue_entry,
247 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000248
249
Simran Basia858a232012-08-21 11:04:37 -0700250class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800251
252
jadmanski0afbb632008-06-06 21:10:57 +0000253 def __init__(self):
254 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000255 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800256 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000257 user_cleanup_time = scheduler_config.config.clean_interval
258 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
259 _db, user_cleanup_time)
260 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000261 self._host_agents = {}
262 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000263 self._tick_count = 0
264 self._last_garbage_stats_time = time.time()
265 self._seconds_between_garbage_stats = 60 * (
266 global_config.global_config.get_config_value(
267 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700268 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700269 self._tick_debug = global_config.global_config.get_config_value(
270 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
271 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700272 self._extra_debugging = global_config.global_config.get_config_value(
273 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
274 default=False)
mbligh36768f02008-02-22 18:28:33 +0000275
mbligh36768f02008-02-22 18:28:33 +0000276
showard915958d2009-04-22 21:00:58 +0000277 def initialize(self, recover_hosts=True):
278 self._periodic_cleanup.initialize()
279 self._24hr_upkeep.initialize()
280
jadmanski0afbb632008-06-06 21:10:57 +0000281 # always recover processes
282 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000283
jadmanski0afbb632008-06-06 21:10:57 +0000284 if recover_hosts:
285 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000286
jamesrenc44ae992010-02-19 00:12:54 +0000287 self._host_scheduler.recovery_on_startup()
288
mbligh36768f02008-02-22 18:28:33 +0000289
Simran Basi0ec94dd2012-08-28 09:50:10 -0700290 def _log_tick_msg(self, msg):
291 if self._tick_debug:
292 logging.debug(msg)
293
294
Simran Basidef92872012-09-20 13:34:34 -0700295 def _log_extra_msg(self, msg):
296 if self._extra_debugging:
297 logging.debug(msg)
298
299
jadmanski0afbb632008-06-06 21:10:57 +0000300 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700301 """
302 This is an altered version of tick() where we keep track of when each
303 major step begins so we can try to figure out where we are using most
304 of the tick time.
305 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700306 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700307 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000308 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700309 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000310 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700311 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000312 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700313 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000314 self._find_aborting()
Simran Basi3f6717d2012-09-13 15:21:22 -0700315 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000316 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700317 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000318 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700319 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000320 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700321 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000322 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700323 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000324 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700325 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000326 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700327 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000328 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700329 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000330 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700331 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700332 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700333 with timer.get_client('email_manager_send_queued_emails'):
334 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700335 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700336 with timer.get_client('django_db_reset_queries'):
337 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000338 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000339
showard97aed502008-11-04 02:01:24 +0000340
mblighf3294cc2009-04-08 21:17:38 +0000341 def _run_cleanup(self):
342 self._periodic_cleanup.run_cleanup_maybe()
343 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000344
mbligh36768f02008-02-22 18:28:33 +0000345
showardf13a9e22009-12-18 22:54:09 +0000346 def _garbage_collection(self):
347 threshold_time = time.time() - self._seconds_between_garbage_stats
348 if threshold_time < self._last_garbage_stats_time:
349 # Don't generate these reports very often.
350 return
351
352 self._last_garbage_stats_time = time.time()
353 # Force a full level 0 collection (because we can, it doesn't hurt
354 # at this interval).
355 gc.collect()
356 logging.info('Logging garbage collector stats on tick %d.',
357 self._tick_count)
358 gc_stats._log_garbage_collector_stats()
359
360
showard170873e2009-01-07 00:22:26 +0000361 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
362 for object_id in object_ids:
363 agent_dict.setdefault(object_id, set()).add(agent)
364
365
366 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
367 for object_id in object_ids:
368 assert object_id in agent_dict
369 agent_dict[object_id].remove(agent)
370
371
showardd1195652009-12-08 22:21:02 +0000372 def add_agent_task(self, agent_task):
373 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000374 self._agents.append(agent)
375 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000376 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
377 self._register_agent_for_ids(self._queue_entry_agents,
378 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000379
showard170873e2009-01-07 00:22:26 +0000380
381 def get_agents_for_entry(self, queue_entry):
382 """
383 Find agents corresponding to the specified queue_entry.
384 """
showardd3dc1992009-04-22 21:01:40 +0000385 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000386
387
388 def host_has_agent(self, host):
389 """
390 Determine if there is currently an Agent present using this host.
391 """
392 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000393
394
jadmanski0afbb632008-06-06 21:10:57 +0000395 def remove_agent(self, agent):
396 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000397 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
398 agent)
399 self._unregister_agent_for_ids(self._queue_entry_agents,
400 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000401
402
showard8cc058f2009-09-08 16:26:33 +0000403 def _host_has_scheduled_special_task(self, host):
404 return bool(models.SpecialTask.objects.filter(host__id=host.id,
405 is_active=False,
406 is_complete=False))
407
408
jadmanski0afbb632008-06-06 21:10:57 +0000409 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000410 agent_tasks = self._create_recovery_agent_tasks()
411 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000412 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000413 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000414 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000415 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000416 self._reverify_remaining_hosts()
417 # reinitialize drones after killing orphaned processes, since they can
418 # leave around files when they die
419 _drone_manager.execute_actions()
420 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000421
showard170873e2009-01-07 00:22:26 +0000422
showardd1195652009-12-08 22:21:02 +0000423 def _create_recovery_agent_tasks(self):
424 return (self._get_queue_entry_agent_tasks()
425 + self._get_special_task_agent_tasks(is_active=True))
426
427
428 def _get_queue_entry_agent_tasks(self):
429 # host queue entry statuses handled directly by AgentTasks (Verifying is
430 # handled through SpecialTasks, so is not listed here)
431 statuses = (models.HostQueueEntry.Status.STARTING,
432 models.HostQueueEntry.Status.RUNNING,
433 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000434 models.HostQueueEntry.Status.PARSING,
435 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000436 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000437 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000438 where='status IN (%s)' % status_list)
439
440 agent_tasks = []
441 used_queue_entries = set()
442 for entry in queue_entries:
443 if self.get_agents_for_entry(entry):
444 # already being handled
445 continue
446 if entry in used_queue_entries:
447 # already picked up by a synchronous job
448 continue
449 agent_task = self._get_agent_task_for_queue_entry(entry)
450 agent_tasks.append(agent_task)
451 used_queue_entries.update(agent_task.queue_entries)
452 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000453
454
showardd1195652009-12-08 22:21:02 +0000455 def _get_special_task_agent_tasks(self, is_active=False):
456 special_tasks = models.SpecialTask.objects.filter(
457 is_active=is_active, is_complete=False)
458 return [self._get_agent_task_for_special_task(task)
459 for task in special_tasks]
460
461
462 def _get_agent_task_for_queue_entry(self, queue_entry):
463 """
464 Construct an AgentTask instance for the given active HostQueueEntry,
465 if one can currently run it.
466 @param queue_entry: a HostQueueEntry
467 @returns an AgentTask to run the queue entry
468 """
469 task_entries = queue_entry.job.get_group_entries(queue_entry)
470 self._check_for_duplicate_host_entries(task_entries)
471
472 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
473 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000474 if queue_entry.is_hostless():
475 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000476 return QueueTask(queue_entries=task_entries)
477 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
478 return GatherLogsTask(queue_entries=task_entries)
479 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
480 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000481 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
482 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000483
Dale Curtisaa513362011-03-01 17:27:44 -0800484 raise host_scheduler.SchedulerError(
485 '_get_agent_task_for_queue_entry got entry with '
486 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000487
488
489 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000490 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
491 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000492 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000493 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000494 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000495 if using_host:
showardd1195652009-12-08 22:21:02 +0000496 self._assert_host_has_no_agent(task_entry)
497
498
499 def _assert_host_has_no_agent(self, entry):
500 """
501 @param entry: a HostQueueEntry or a SpecialTask
502 """
503 if self.host_has_agent(entry.host):
504 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800505 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000506 'While scheduling %s, host %s already has a host agent %s'
507 % (entry, entry.host, agent.task))
508
509
510 def _get_agent_task_for_special_task(self, special_task):
511 """
512 Construct an AgentTask class to run the given SpecialTask and add it
513 to this dispatcher.
514 @param special_task: a models.SpecialTask instance
515 @returns an AgentTask to run this SpecialTask
516 """
517 self._assert_host_has_no_agent(special_task)
518
519 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
520 for agent_task_class in special_agent_task_classes:
521 if agent_task_class.TASK_TYPE == special_task.task:
522 return agent_task_class(task=special_task)
523
Dale Curtisaa513362011-03-01 17:27:44 -0800524 raise host_scheduler.SchedulerError(
525 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000526
527
528 def _register_pidfiles(self, agent_tasks):
529 for agent_task in agent_tasks:
530 agent_task.register_necessary_pidfiles()
531
532
533 def _recover_tasks(self, agent_tasks):
534 orphans = _drone_manager.get_orphaned_autoserv_processes()
535
536 for agent_task in agent_tasks:
537 agent_task.recover()
538 if agent_task.monitor and agent_task.monitor.has_process():
539 orphans.discard(agent_task.monitor.get_process())
540 self.add_agent_task(agent_task)
541
542 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000543
544
showard8cc058f2009-09-08 16:26:33 +0000545 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000546 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
547 % status):
showard0db3d432009-10-12 20:29:15 +0000548 if entry.status == status and not self.get_agents_for_entry(entry):
549 # The status can change during iteration, e.g., if job.run()
550 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000551 yield entry
552
553
showard6878e8b2009-07-20 22:37:45 +0000554 def _check_for_remaining_orphan_processes(self, orphans):
555 if not orphans:
556 return
557 subject = 'Unrecovered orphan autoserv processes remain'
558 message = '\n'.join(str(process) for process in orphans)
559 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000560
561 die_on_orphans = global_config.global_config.get_config_value(
562 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
563
564 if die_on_orphans:
565 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000566
showard170873e2009-01-07 00:22:26 +0000567
showard8cc058f2009-09-08 16:26:33 +0000568 def _recover_pending_entries(self):
569 for entry in self._get_unassigned_entries(
570 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000571 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000572 entry.on_pending()
573
574
showardb8900452009-10-12 20:31:01 +0000575 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000576 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000577 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
578 unrecovered_hqes = []
579 for queue_entry in queue_entries:
580 special_tasks = models.SpecialTask.objects.filter(
581 task__in=(models.SpecialTask.Task.CLEANUP,
582 models.SpecialTask.Task.VERIFY),
583 queue_entry__id=queue_entry.id,
584 is_complete=False)
585 if special_tasks.count() == 0:
586 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000587
showardb8900452009-10-12 20:31:01 +0000588 if unrecovered_hqes:
589 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800590 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000591 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000592 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000593
594
showard65db3932009-10-28 19:54:35 +0000595 def _get_prioritized_special_tasks(self):
596 """
597 Returns all queued SpecialTasks prioritized for repair first, then
598 cleanup, then verify.
599 """
600 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
601 is_complete=False,
602 host__locked=False)
603 # exclude hosts with active queue entries unless the SpecialTask is for
604 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000605 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000606 queued_tasks, 'afe_host_queue_entries', 'host_id',
607 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000608 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000609 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000610 where=['(afe_host_queue_entries.id IS NULL OR '
611 'afe_host_queue_entries.id = '
612 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000613
showard65db3932009-10-28 19:54:35 +0000614 # reorder tasks by priority
615 task_priority_order = [models.SpecialTask.Task.REPAIR,
616 models.SpecialTask.Task.CLEANUP,
617 models.SpecialTask.Task.VERIFY]
618 def task_priority_key(task):
619 return task_priority_order.index(task.task)
620 return sorted(queued_tasks, key=task_priority_key)
621
622
showard65db3932009-10-28 19:54:35 +0000623 def _schedule_special_tasks(self):
624 """
625 Execute queued SpecialTasks that are ready to run on idle hosts.
626 """
627 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000628 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000629 continue
showardd1195652009-12-08 22:21:02 +0000630 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000631
632
showard170873e2009-01-07 00:22:26 +0000633 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000634 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000635 # should never happen
showarded2afea2009-07-07 20:54:07 +0000636 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000637 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000638 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000639 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000640 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000641
642
jadmanski0afbb632008-06-06 21:10:57 +0000643 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000644 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700645 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000646 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000647 if self.host_has_agent(host):
648 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000649 continue
showard8cc058f2009-09-08 16:26:33 +0000650 if self._host_has_scheduled_special_task(host):
651 # host will have a special task scheduled on the next cycle
652 continue
showard170873e2009-01-07 00:22:26 +0000653 if print_message:
showardb18134f2009-03-20 20:52:18 +0000654 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000655 models.SpecialTask.objects.create(
656 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000657 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000658
659
jadmanski0afbb632008-06-06 21:10:57 +0000660 def _recover_hosts(self):
661 # recover "Repair Failed" hosts
662 message = 'Reverifying dead host %s'
663 self._reverify_hosts_where("status = 'Repair Failed'",
664 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000665
666
showard04c82c52008-05-29 19:38:12 +0000667
showardb95b1bd2008-08-15 18:11:04 +0000668 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000669 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +0000670 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000671 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000672 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +0000673 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000674
675
showard89f84db2009-03-12 20:39:13 +0000676 def _refresh_pending_queue_entries(self):
677 """
678 Lookup the pending HostQueueEntries and call our HostScheduler
679 refresh() method given that list. Return the list.
680
681 @returns A list of pending HostQueueEntries sorted in priority order.
682 """
showard63a34772008-08-18 19:32:50 +0000683 queue_entries = self._get_pending_queue_entries()
684 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000685 return []
showardb95b1bd2008-08-15 18:11:04 +0000686
showard63a34772008-08-18 19:32:50 +0000687 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000688
showard89f84db2009-03-12 20:39:13 +0000689 return queue_entries
690
691
692 def _schedule_atomic_group(self, queue_entry):
693 """
694 Schedule the given queue_entry on an atomic group of hosts.
695
696 Returns immediately if there are insufficient available hosts.
697
698 Creates new HostQueueEntries based off of queue_entry for the
699 scheduled hosts and starts them all running.
700 """
701 # This is a virtual host queue entry representing an entire
702 # atomic group, find a group and schedule their hosts.
703 group_hosts = self._host_scheduler.find_eligible_atomic_group(
704 queue_entry)
705 if not group_hosts:
706 return
showardcbe6f942009-06-17 19:33:49 +0000707
708 logging.info('Expanding atomic group entry %s with hosts %s',
709 queue_entry,
710 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000711
showard89f84db2009-03-12 20:39:13 +0000712 for assigned_host in group_hosts[1:]:
713 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000714 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000715 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000716 new_hqe.set_host(assigned_host)
717 self._run_queue_entry(new_hqe)
718
719 # The first assigned host uses the original HostQueueEntry
720 queue_entry.set_host(group_hosts[0])
721 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000722
723
showarda9545c02009-12-18 22:44:26 +0000724 def _schedule_hostless_job(self, queue_entry):
725 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000726 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000727
728
showard89f84db2009-03-12 20:39:13 +0000729 def _schedule_new_jobs(self):
730 queue_entries = self._refresh_pending_queue_entries()
731 if not queue_entries:
732 return
733
Simran Basi3f6717d2012-09-13 15:21:22 -0700734 logging.debug('Processing %d queue_entries', len(queue_entries))
showard63a34772008-08-18 19:32:50 +0000735 for queue_entry in queue_entries:
Simran Basidef92872012-09-20 13:34:34 -0700736 self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
showarde55955f2009-10-07 20:48:58 +0000737 is_unassigned_atomic_group = (
738 queue_entry.atomic_group_id is not None
739 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +0000740
741 if queue_entry.is_hostless():
Simran Basidef92872012-09-20 13:34:34 -0700742 self._log_extra_msg('Scheduling hostless job.')
showarda9545c02009-12-18 22:44:26 +0000743 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000744 elif is_unassigned_atomic_group:
745 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +0000746 else:
jamesren883492a2010-02-12 00:45:18 +0000747 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +0000748 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +0000749 assert assigned_host.id == queue_entry.host_id
750 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000751
752
showard8cc058f2009-09-08 16:26:33 +0000753 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +0000754 for agent_task in self._get_queue_entry_agent_tasks():
755 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000756
757
758 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000759 for entry in scheduler_models.HostQueueEntry.fetch(
760 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000761 task = entry.job.schedule_delayed_callback_task(entry)
762 if task:
showardd1195652009-12-08 22:21:02 +0000763 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000764
765
jamesren883492a2010-02-12 00:45:18 +0000766 def _run_queue_entry(self, queue_entry):
Simran Basidef92872012-09-20 13:34:34 -0700767 self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
768 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000769 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000770
771
jadmanski0afbb632008-06-06 21:10:57 +0000772 def _find_aborting(self):
jamesrene7c65cb2010-06-08 20:38:10 +0000773 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000774 for entry in scheduler_models.HostQueueEntry.fetch(
775 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +0000776 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +0000777 for agent in self.get_agents_for_entry(entry):
778 agent.abort()
779 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000780 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700781 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000782 for job in jobs_to_stop:
783 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000784
785
showard324bf812009-01-20 23:23:38 +0000786 def _can_start_agent(self, agent, num_started_this_cycle,
787 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000788 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000789 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000790 return True
791 # don't allow any nonzero-process agents to run after we've reached a
792 # limit (this avoids starvation of many-process agents)
793 if have_reached_limit:
794 return False
795 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000796 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000797 agent.task.owner_username,
798 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000799 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000800 return False
801 # if a single agent exceeds the per-cycle throttling, still allow it to
802 # run when it's the first agent in the cycle
803 if num_started_this_cycle == 0:
804 return True
805 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000806 if (num_started_this_cycle + agent.task.num_processes >
807 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000808 return False
809 return True
810
811
jadmanski0afbb632008-06-06 21:10:57 +0000812 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000813 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000814 have_reached_limit = False
815 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700816 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000817 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700818 self._log_extra_msg('Processing Agent with Host Ids: %s and '
819 'queue_entry ids:%s' % (agent.host_ids,
820 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000821 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000822 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000823 have_reached_limit):
824 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700825 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000826 continue
showardd1195652009-12-08 22:21:02 +0000827 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700828 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000829 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700830 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000831 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700832 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000833 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700834 logging.info('%d running processes. %d added this cycle.',
835 _drone_manager.total_running_processes(),
836 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +0000837
838
showard29f7cd22009-04-29 21:16:24 +0000839 def _process_recurring_runs(self):
840 recurring_runs = models.RecurringRun.objects.filter(
841 start_date__lte=datetime.datetime.now())
842 for rrun in recurring_runs:
843 # Create job from template
844 job = rrun.job
845 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000846 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000847
848 host_objects = info['hosts']
849 one_time_hosts = info['one_time_hosts']
850 metahost_objects = info['meta_hosts']
851 dependencies = info['dependencies']
852 atomic_group = info['atomic_group']
853
854 for host in one_time_hosts or []:
855 this_host = models.Host.create_one_time_host(host.hostname)
856 host_objects.append(this_host)
857
858 try:
859 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000860 options=options,
showard29f7cd22009-04-29 21:16:24 +0000861 host_objects=host_objects,
862 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +0000863 atomic_group=atomic_group)
864
865 except Exception, ex:
866 logging.exception(ex)
867 #TODO send email
868
869 if rrun.loop_count == 1:
870 rrun.delete()
871 else:
872 if rrun.loop_count != 0: # if not infinite loop
873 # calculate new start_date
874 difference = datetime.timedelta(seconds=rrun.loop_period)
875 rrun.start_date = rrun.start_date + difference
876 rrun.loop_count -= 1
877 rrun.save()
878
879
Simran Basia858a232012-08-21 11:04:37 -0700880SiteDispatcher = utils.import_site_class(
881 __file__, 'autotest_lib.scheduler.site_monitor_db',
882 'SiteDispatcher', BaseDispatcher)
883
884class Dispatcher(SiteDispatcher):
885 pass
886
887
showard170873e2009-01-07 00:22:26 +0000888class PidfileRunMonitor(object):
889 """
890 Client must call either run() to start a new process or
891 attach_to_existing_process().
892 """
mbligh36768f02008-02-22 18:28:33 +0000893
showard170873e2009-01-07 00:22:26 +0000894 class _PidfileException(Exception):
895 """
896 Raised when there's some unexpected behavior with the pid file, but only
897 used internally (never allowed to escape this class).
898 """
mbligh36768f02008-02-22 18:28:33 +0000899
900
showard170873e2009-01-07 00:22:26 +0000901 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000902 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000903 self._start_time = None
904 self.pidfile_id = None
905 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000906
907
showard170873e2009-01-07 00:22:26 +0000908 def _add_nice_command(self, command, nice_level):
909 if not nice_level:
910 return command
911 return ['nice', '-n', str(nice_level)] + command
912
913
914 def _set_start_time(self):
915 self._start_time = time.time()
916
917
showard418785b2009-11-23 20:19:59 +0000918 def run(self, command, working_directory, num_processes, nice_level=None,
919 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +0000920 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +0000921 assert command is not None
922 if nice_level is not None:
923 command = ['nice', '-n', str(nice_level)] + command
924 self._set_start_time()
925 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +0000926 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +0000927 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +0000928 paired_with_pidfile=paired_with_pidfile, username=username,
929 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +0000930
931
showarded2afea2009-07-07 20:54:07 +0000932 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +0000933 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +0000934 num_processes=None):
showard170873e2009-01-07 00:22:26 +0000935 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +0000936 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000937 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +0000938 if num_processes is not None:
939 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +0000940
941
jadmanski0afbb632008-06-06 21:10:57 +0000942 def kill(self):
showard170873e2009-01-07 00:22:26 +0000943 if self.has_process():
944 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000945
mbligh36768f02008-02-22 18:28:33 +0000946
showard170873e2009-01-07 00:22:26 +0000947 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000948 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000949 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000950
951
showard170873e2009-01-07 00:22:26 +0000952 def get_process(self):
showard21baa452008-10-21 00:08:39 +0000953 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +0000954 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +0000955 return self._state.process
mblighbb421852008-03-11 22:36:16 +0000956
957
showard170873e2009-01-07 00:22:26 +0000958 def _read_pidfile(self, use_second_read=False):
959 assert self.pidfile_id is not None, (
960 'You must call run() or attach_to_existing_process()')
961 contents = _drone_manager.get_pidfile_contents(
962 self.pidfile_id, use_second_read=use_second_read)
963 if contents.is_invalid():
964 self._state = drone_manager.PidfileContents()
965 raise self._PidfileException(contents)
966 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +0000967
968
showard21baa452008-10-21 00:08:39 +0000969 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +0000970 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
971 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +0000972 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +0000973 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +0000974
975
976 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +0000977 if self.lost_process:
showard21baa452008-10-21 00:08:39 +0000978 return
mblighbb421852008-03-11 22:36:16 +0000979
showard21baa452008-10-21 00:08:39 +0000980 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +0000981
showard170873e2009-01-07 00:22:26 +0000982 if self._state.process is None:
983 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +0000984 return
mbligh90a549d2008-03-25 23:52:34 +0000985
showard21baa452008-10-21 00:08:39 +0000986 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000987 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +0000988 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +0000989 return
mbligh90a549d2008-03-25 23:52:34 +0000990
showard170873e2009-01-07 00:22:26 +0000991 # pid but no running process - maybe process *just* exited
992 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +0000993 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000994 # autoserv exited without writing an exit code
995 # to the pidfile
showard21baa452008-10-21 00:08:39 +0000996 self._handle_pidfile_error(
997 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +0000998
showard21baa452008-10-21 00:08:39 +0000999
1000 def _get_pidfile_info(self):
1001 """\
1002 After completion, self._state will contain:
1003 pid=None, exit_status=None if autoserv has not yet run
1004 pid!=None, exit_status=None if autoserv is running
1005 pid!=None, exit_status!=None if autoserv has completed
1006 """
1007 try:
1008 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001009 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001010 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001011
1012
showard170873e2009-01-07 00:22:26 +00001013 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001014 """\
1015 Called when no pidfile is found or no pid is in the pidfile.
1016 """
showard170873e2009-01-07 00:22:26 +00001017 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001018 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001019 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001020 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001021 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001022
1023
showard35162b02009-03-03 02:17:30 +00001024 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001025 """\
1026 Called when autoserv has exited without writing an exit status,
1027 or we've timed out waiting for autoserv to write a pid to the
1028 pidfile. In either case, we just return failure and the caller
1029 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001030
showard170873e2009-01-07 00:22:26 +00001031 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001032 """
1033 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001034 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001035 self._state.exit_status = 1
1036 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001037
1038
jadmanski0afbb632008-06-06 21:10:57 +00001039 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001040 self._get_pidfile_info()
1041 return self._state.exit_status
1042
1043
1044 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001045 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001046 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001047 if self._state.num_tests_failed is None:
1048 return -1
showard21baa452008-10-21 00:08:39 +00001049 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001050
1051
showardcdaeae82009-08-31 18:32:48 +00001052 def try_copy_results_on_drone(self, **kwargs):
1053 if self.has_process():
1054 # copy results logs into the normal place for job results
1055 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1056
1057
1058 def try_copy_to_results_repository(self, source, **kwargs):
1059 if self.has_process():
1060 _drone_manager.copy_to_results_repository(self.get_process(),
1061 source, **kwargs)
1062
1063
mbligh36768f02008-02-22 18:28:33 +00001064class Agent(object):
showard77182562009-06-10 00:16:05 +00001065 """
showard8cc058f2009-09-08 16:26:33 +00001066 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001067
1068 The following methods are required on all task objects:
1069 poll() - Called periodically to let the task check its status and
1070 update its internal state. If the task succeeded.
1071 is_done() - Returns True if the task is finished.
1072 abort() - Called when an abort has been requested. The task must
1073 set its aborted attribute to True if it actually aborted.
1074
1075 The following attributes are required on all task objects:
1076 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001077 success - bool, True if this task succeeded.
1078 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1079 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001080 """
1081
1082
showard418785b2009-11-23 20:19:59 +00001083 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001084 """
showard8cc058f2009-09-08 16:26:33 +00001085 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001086 """
showard8cc058f2009-09-08 16:26:33 +00001087 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001088
showard77182562009-06-10 00:16:05 +00001089 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001090 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001091
showard8cc058f2009-09-08 16:26:33 +00001092 self.queue_entry_ids = task.queue_entry_ids
1093 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001094
showard8cc058f2009-09-08 16:26:33 +00001095 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001096 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001097
1098
jadmanski0afbb632008-06-06 21:10:57 +00001099 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001100 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001101 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001102 self.task.poll()
1103 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001104 self.finished = True
showardec113162008-05-08 00:52:49 +00001105
1106
jadmanski0afbb632008-06-06 21:10:57 +00001107 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001108 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001109
1110
showardd3dc1992009-04-22 21:01:40 +00001111 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001112 if self.task:
1113 self.task.abort()
1114 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001115 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001116 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001117
showardd3dc1992009-04-22 21:01:40 +00001118
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001119class BaseAgentTask(object):
showardd1195652009-12-08 22:21:02 +00001120 class _NullMonitor(object):
1121 pidfile_id = None
1122
1123 def has_process(self):
1124 return True
1125
1126
1127 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001128 """
showardd1195652009-12-08 22:21:02 +00001129 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001130 """
jadmanski0afbb632008-06-06 21:10:57 +00001131 self.done = False
showardd1195652009-12-08 22:21:02 +00001132 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001133 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001134 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001135 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001136 self.queue_entry_ids = []
1137 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001138 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001139
1140
1141 def _set_ids(self, host=None, queue_entries=None):
1142 if queue_entries and queue_entries != [None]:
1143 self.host_ids = [entry.host.id for entry in queue_entries]
1144 self.queue_entry_ids = [entry.id for entry in queue_entries]
1145 else:
1146 assert host
1147 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001148
1149
jadmanski0afbb632008-06-06 21:10:57 +00001150 def poll(self):
showard08a36412009-05-05 01:01:13 +00001151 if not self.started:
1152 self.start()
showardd1195652009-12-08 22:21:02 +00001153 if not self.done:
1154 self.tick()
showard08a36412009-05-05 01:01:13 +00001155
1156
1157 def tick(self):
showardd1195652009-12-08 22:21:02 +00001158 assert self.monitor
1159 exit_code = self.monitor.exit_code()
1160 if exit_code is None:
1161 return
mbligh36768f02008-02-22 18:28:33 +00001162
showardd1195652009-12-08 22:21:02 +00001163 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001164 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001165
1166
jadmanski0afbb632008-06-06 21:10:57 +00001167 def is_done(self):
1168 return self.done
mbligh36768f02008-02-22 18:28:33 +00001169
1170
jadmanski0afbb632008-06-06 21:10:57 +00001171 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001172 if self.done:
showardd1195652009-12-08 22:21:02 +00001173 assert self.started
showard08a36412009-05-05 01:01:13 +00001174 return
showardd1195652009-12-08 22:21:02 +00001175 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001176 self.done = True
1177 self.success = success
1178 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001179
1180
jadmanski0afbb632008-06-06 21:10:57 +00001181 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001182 """
1183 To be overridden.
1184 """
showarded2afea2009-07-07 20:54:07 +00001185 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001186 self.register_necessary_pidfiles()
1187
1188
1189 def _log_file(self):
1190 if not self._log_file_name:
1191 return None
1192 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001193
mbligh36768f02008-02-22 18:28:33 +00001194
jadmanski0afbb632008-06-06 21:10:57 +00001195 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001196 log_file = self._log_file()
1197 if self.monitor and log_file:
1198 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001199
1200
jadmanski0afbb632008-06-06 21:10:57 +00001201 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001202 """
1203 To be overridden.
1204 """
jadmanski0afbb632008-06-06 21:10:57 +00001205 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001206 logging.info("%s finished with success=%s", type(self).__name__,
1207 self.success)
1208
mbligh36768f02008-02-22 18:28:33 +00001209
1210
jadmanski0afbb632008-06-06 21:10:57 +00001211 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001212 if not self.started:
1213 self.prolog()
1214 self.run()
1215
1216 self.started = True
1217
1218
1219 def abort(self):
1220 if self.monitor:
1221 self.monitor.kill()
1222 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001223 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001224 self.cleanup()
1225
1226
showarded2afea2009-07-07 20:54:07 +00001227 def _get_consistent_execution_path(self, execution_entries):
1228 first_execution_path = execution_entries[0].execution_path()
1229 for execution_entry in execution_entries[1:]:
1230 assert execution_entry.execution_path() == first_execution_path, (
1231 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1232 execution_entry,
1233 first_execution_path,
1234 execution_entries[0]))
1235 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001236
1237
showarded2afea2009-07-07 20:54:07 +00001238 def _copy_results(self, execution_entries, use_monitor=None):
1239 """
1240 @param execution_entries: list of objects with execution_path() method
1241 """
showard6d1c1432009-08-20 23:30:39 +00001242 if use_monitor is not None and not use_monitor.has_process():
1243 return
1244
showarded2afea2009-07-07 20:54:07 +00001245 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001246 if use_monitor is None:
1247 assert self.monitor
1248 use_monitor = self.monitor
1249 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001250 execution_path = self._get_consistent_execution_path(execution_entries)
1251 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001252 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001253
showarda1e74b32009-05-12 17:32:04 +00001254
1255 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001256 for queue_entry in queue_entries:
1257 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001258
1259
mbligh4608b002010-01-05 18:22:35 +00001260 def _archive_results(self, queue_entries):
1261 for queue_entry in queue_entries:
1262 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001263
1264
showardd1195652009-12-08 22:21:02 +00001265 def _command_line(self):
1266 """
1267 Return the command line to run. Must be overridden.
1268 """
1269 raise NotImplementedError
1270
1271
1272 @property
1273 def num_processes(self):
1274 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001275 Return the number of processes forked by this BaseAgentTask's process.
1276 It may only be approximate. To be overridden if necessary.
showardd1195652009-12-08 22:21:02 +00001277 """
1278 return 1
1279
1280
1281 def _paired_with_monitor(self):
1282 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001283 If this BaseAgentTask's process must run on the same machine as some
showardd1195652009-12-08 22:21:02 +00001284 previous process, this method should be overridden to return a
1285 PidfileRunMonitor for that process.
1286 """
1287 return self._NullMonitor()
1288
1289
1290 @property
1291 def owner_username(self):
1292 """
1293 Return login of user responsible for this task. May be None. Must be
1294 overridden.
1295 """
1296 raise NotImplementedError
1297
1298
1299 def _working_directory(self):
1300 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001301 Return the directory where this BaseAgentTask's process executes.
1302 Must be overridden.
showardd1195652009-12-08 22:21:02 +00001303 """
1304 raise NotImplementedError
1305
1306
1307 def _pidfile_name(self):
1308 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001309 Return the name of the pidfile this BaseAgentTask's process uses. To be
showardd1195652009-12-08 22:21:02 +00001310 overridden if necessary.
1311 """
jamesrenc44ae992010-02-19 00:12:54 +00001312 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001313
1314
1315 def _check_paired_results_exist(self):
1316 if not self._paired_with_monitor().has_process():
1317 email_manager.manager.enqueue_notify_email(
1318 'No paired results in task',
1319 'No paired results in task %s at %s'
1320 % (self, self._paired_with_monitor().pidfile_id))
1321 self.finished(False)
1322 return False
1323 return True
1324
1325
1326 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001327 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001328 self.monitor = PidfileRunMonitor()
1329
1330
1331 def run(self):
1332 if not self._check_paired_results_exist():
1333 return
1334
1335 self._create_monitor()
1336 self.monitor.run(
1337 self._command_line(), self._working_directory(),
1338 num_processes=self.num_processes,
1339 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1340 pidfile_name=self._pidfile_name(),
1341 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001342 username=self.owner_username,
1343 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1344
1345
1346 def get_drone_hostnames_allowed(self):
1347 if not models.DroneSet.drone_sets_enabled():
1348 return None
1349
1350 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1351 if not hqes:
1352 # Only special tasks could be missing host queue entries
1353 assert isinstance(self, SpecialAgentTask)
1354 return self._user_or_global_default_drone_set(
1355 self.task, self.task.requested_by)
1356
1357 job_ids = hqes.values_list('job', flat=True).distinct()
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001358 assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
jamesren76fcf192010-04-21 20:39:50 +00001359 "span multiple jobs")
1360
1361 job = models.Job.objects.get(id=job_ids[0])
1362 drone_set = job.drone_set
1363 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001364 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001365
1366 return drone_set.get_drone_hostnames()
1367
1368
1369 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1370 """
1371 Returns the user's default drone set, if present.
1372
1373 Otherwise, returns the global default drone set.
1374 """
1375 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1376 if not user:
1377 logging.warn('%s had no owner; using default drone set',
1378 obj_with_owner)
1379 return default_hostnames
1380 if not user.drone_set:
1381 logging.warn('User %s has no default drone set, using global '
1382 'default', user.login)
1383 return default_hostnames
1384 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001385
1386
1387 def register_necessary_pidfiles(self):
1388 pidfile_id = _drone_manager.get_pidfile_id_from(
1389 self._working_directory(), self._pidfile_name())
1390 _drone_manager.register_pidfile(pidfile_id)
1391
1392 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1393 if paired_pidfile_id:
1394 _drone_manager.register_pidfile(paired_pidfile_id)
1395
1396
1397 def recover(self):
1398 if not self._check_paired_results_exist():
1399 return
1400
1401 self._create_monitor()
1402 self.monitor.attach_to_existing_process(
1403 self._working_directory(), pidfile_name=self._pidfile_name(),
1404 num_processes=self.num_processes)
1405 if not self.monitor.has_process():
1406 # no process to recover; wait to be started normally
1407 self.monitor = None
1408 return
1409
1410 self.started = True
Aviv Keshet225bdfe2013-03-05 10:10:08 -08001411 logging.info('Recovering process %s for %s at %s',
1412 self.monitor.get_process(), type(self).__name__,
1413 self._working_directory())
mbligh36768f02008-02-22 18:28:33 +00001414
1415
mbligh4608b002010-01-05 18:22:35 +00001416 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1417 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001418 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001419 for entry in queue_entries:
1420 if entry.status not in allowed_hqe_statuses:
Dale Curtisaa513362011-03-01 17:27:44 -08001421 raise host_scheduler.SchedulerError(
1422 '%s attempting to start entry with invalid status %s: '
1423 '%s' % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001424 invalid_host_status = (
1425 allowed_host_statuses is not None
1426 and entry.host.status not in allowed_host_statuses)
1427 if invalid_host_status:
Dale Curtisaa513362011-03-01 17:27:44 -08001428 raise host_scheduler.SchedulerError(
1429 '%s attempting to start on queue entry with invalid '
1430 'host status %s: %s'
1431 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001432
1433
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001434SiteAgentTask = utils.import_site_class(
1435 __file__, 'autotest_lib.scheduler.site_monitor_db',
1436 'SiteAgentTask', BaseAgentTask)
1437
1438class AgentTask(SiteAgentTask):
1439 pass
1440
1441
showardd9205182009-04-27 20:09:55 +00001442class TaskWithJobKeyvals(object):
1443 """AgentTask mixin providing functionality to help with job keyval files."""
1444 _KEYVAL_FILE = 'keyval'
1445 def _format_keyval(self, key, value):
1446 return '%s=%s' % (key, value)
1447
1448
1449 def _keyval_path(self):
1450 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001451 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001452
1453
1454 def _write_keyval_after_job(self, field, value):
1455 assert self.monitor
1456 if not self.monitor.has_process():
1457 return
1458 _drone_manager.write_lines_to_file(
1459 self._keyval_path(), [self._format_keyval(field, value)],
1460 paired_with_process=self.monitor.get_process())
1461
1462
1463 def _job_queued_keyval(self, job):
1464 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1465
1466
1467 def _write_job_finished(self):
1468 self._write_keyval_after_job("job_finished", int(time.time()))
1469
1470
showarddb502762009-09-09 15:31:20 +00001471 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1472 keyval_contents = '\n'.join(self._format_keyval(key, value)
1473 for key, value in keyval_dict.iteritems())
1474 # always end with a newline to allow additional keyvals to be written
1475 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001476 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001477 keyval_contents,
1478 file_path=keyval_path)
1479
1480
1481 def _write_keyvals_before_job(self, keyval_dict):
1482 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1483
1484
1485 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001486 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001487 host.hostname)
1488 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001489 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001490 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1491 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1492
1493
showard8cc058f2009-09-08 16:26:33 +00001494class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001495 """
1496 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1497 """
1498
1499 TASK_TYPE = None
1500 host = None
1501 queue_entry = None
1502
showardd1195652009-12-08 22:21:02 +00001503 def __init__(self, task, extra_command_args):
1504 super(SpecialAgentTask, self).__init__()
1505
lmrb7c5d272010-04-16 06:34:04 +00001506 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001507
jamesrenc44ae992010-02-19 00:12:54 +00001508 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001509 self.queue_entry = None
1510 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001511 self.queue_entry = scheduler_models.HostQueueEntry(
1512 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001513
showarded2afea2009-07-07 20:54:07 +00001514 self.task = task
1515 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001516
1517
showard8cc058f2009-09-08 16:26:33 +00001518 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001519 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1520
1521
1522 def _command_line(self):
1523 return _autoserv_command_line(self.host.hostname,
1524 self._extra_command_args,
1525 queue_entry=self.queue_entry)
1526
1527
1528 def _working_directory(self):
1529 return self.task.execution_path()
1530
1531
1532 @property
1533 def owner_username(self):
1534 if self.task.requested_by:
1535 return self.task.requested_by.login
1536 return None
showard8cc058f2009-09-08 16:26:33 +00001537
1538
showarded2afea2009-07-07 20:54:07 +00001539 def prolog(self):
1540 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001541 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001542 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001543
1544
showardde634ee2009-01-30 01:44:24 +00001545 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001546 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001547
showard2fe3f1d2009-07-06 20:19:11 +00001548 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001549 return # don't fail metahost entries, they'll be reassigned
1550
showard2fe3f1d2009-07-06 20:19:11 +00001551 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001552 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001553 return # entry has been aborted
1554
showard2fe3f1d2009-07-06 20:19:11 +00001555 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001556 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001557 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001558 self._write_keyval_after_job(queued_key, queued_time)
1559 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001560
showard8cc058f2009-09-08 16:26:33 +00001561 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001562 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001563 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001564 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001565
showard8cc058f2009-09-08 16:26:33 +00001566 pidfile_id = _drone_manager.get_pidfile_id_from(
1567 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001568 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001569 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001570
1571 if self.queue_entry.job.parse_failed_repair:
1572 self._parse_results([self.queue_entry])
1573 else:
1574 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001575
1576
1577 def cleanup(self):
1578 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001579
1580 # We will consider an aborted task to be "Failed"
1581 self.task.finish(bool(self.success))
1582
showardf85a0b72009-10-07 20:48:45 +00001583 if self.monitor:
1584 if self.monitor.has_process():
1585 self._copy_results([self.task])
1586 if self.monitor.pidfile_id is not None:
1587 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001588
1589
1590class RepairTask(SpecialAgentTask):
1591 TASK_TYPE = models.SpecialTask.Task.REPAIR
1592
1593
showardd1195652009-12-08 22:21:02 +00001594 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001595 """\
1596 queue_entry: queue entry to mark failed if this repair fails.
1597 """
1598 protection = host_protections.Protection.get_string(
1599 task.host.protection)
1600 # normalize the protection name
1601 protection = host_protections.Protection.get_attr_name(protection)
1602
1603 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001604 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001605
1606 # *don't* include the queue entry in IDs -- if the queue entry is
1607 # aborted, we want to leave the repair task running
1608 self._set_ids(host=self.host)
1609
1610
1611 def prolog(self):
1612 super(RepairTask, self).prolog()
1613 logging.info("repair_task starting")
1614 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001615
1616
jadmanski0afbb632008-06-06 21:10:57 +00001617 def epilog(self):
1618 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001619
jadmanski0afbb632008-06-06 21:10:57 +00001620 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001621 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001622 else:
showard8cc058f2009-09-08 16:26:33 +00001623 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001624 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001625 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001626
1627
showarded2afea2009-07-07 20:54:07 +00001628class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001629 def _copy_to_results_repository(self):
1630 if not self.queue_entry or self.queue_entry.meta_host:
1631 return
1632
1633 self.queue_entry.set_execution_subdir()
1634 log_name = os.path.basename(self.task.execution_path())
1635 source = os.path.join(self.task.execution_path(), 'debug',
1636 'autoserv.DEBUG')
1637 destination = os.path.join(
1638 self.queue_entry.execution_path(), log_name)
1639
1640 self.monitor.try_copy_to_results_repository(
1641 source, destination_path=destination)
1642
1643
showard170873e2009-01-07 00:22:26 +00001644 def epilog(self):
1645 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001646
showard775300b2009-09-09 15:30:50 +00001647 if self.success:
1648 return
showard8fe93b52008-11-18 17:53:22 +00001649
showard775300b2009-09-09 15:30:50 +00001650 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001651
showard775300b2009-09-09 15:30:50 +00001652 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001653 # effectively ignore failure for these hosts
1654 self.success = True
showard775300b2009-09-09 15:30:50 +00001655 return
1656
1657 if self.queue_entry:
1658 self.queue_entry.requeue()
1659
1660 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001661 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001662 queue_entry__id=self.queue_entry.id):
1663 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1664 self._fail_queue_entry()
1665 return
1666
showard9bb960b2009-11-19 01:02:11 +00001667 queue_entry = models.HostQueueEntry.objects.get(
1668 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001669 else:
1670 queue_entry = None
1671
1672 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001673 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001674 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001675 queue_entry=queue_entry,
1676 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001677
showard8fe93b52008-11-18 17:53:22 +00001678
1679class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001680 TASK_TYPE = models.SpecialTask.Task.VERIFY
1681
1682
showardd1195652009-12-08 22:21:02 +00001683 def __init__(self, task):
1684 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001685 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001686
1687
jadmanski0afbb632008-06-06 21:10:57 +00001688 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001689 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001690
showardb18134f2009-03-20 20:52:18 +00001691 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001692 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001693 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1694 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001695
jamesren42318f72010-05-10 23:40:59 +00001696 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00001697 # and there's no need to keep records of other requests.
1698 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001699 host__id=self.host.id,
1700 task=models.SpecialTask.Task.VERIFY,
jamesren42318f72010-05-10 23:40:59 +00001701 is_active=False, is_complete=False, queue_entry=None)
showarded2afea2009-07-07 20:54:07 +00001702 queued_verifies = queued_verifies.exclude(id=self.task.id)
1703 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001704
mbligh36768f02008-02-22 18:28:33 +00001705
jadmanski0afbb632008-06-06 21:10:57 +00001706 def epilog(self):
1707 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001708 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001709 if self.queue_entry:
1710 self.queue_entry.on_pending()
1711 else:
1712 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001713
1714
mbligh4608b002010-01-05 18:22:35 +00001715class CleanupTask(PreJobTask):
1716 # note this can also run post-job, but when it does, it's running standalone
1717 # against the host (not related to the job), so it's not considered a
1718 # PostJobTask
1719
1720 TASK_TYPE = models.SpecialTask.Task.CLEANUP
1721
1722
1723 def __init__(self, task, recover_run_monitor=None):
1724 super(CleanupTask, self).__init__(task, ['--cleanup'])
1725 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1726
1727
1728 def prolog(self):
1729 super(CleanupTask, self).prolog()
1730 logging.info("starting cleanup task for host: %s", self.host.hostname)
1731 self.host.set_status(models.Host.Status.CLEANING)
1732 if self.queue_entry:
1733 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1734
1735
1736 def _finish_epilog(self):
1737 if not self.queue_entry or not self.success:
1738 return
1739
1740 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
1741 should_run_verify = (
1742 self.queue_entry.job.run_verify
1743 and self.host.protection != do_not_verify_protection)
1744 if should_run_verify:
1745 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1746 models.SpecialTask.objects.create(
1747 host=models.Host.objects.get(id=self.host.id),
1748 queue_entry=entry,
1749 task=models.SpecialTask.Task.VERIFY)
1750 else:
1751 self.queue_entry.on_pending()
1752
1753
1754 def epilog(self):
1755 super(CleanupTask, self).epilog()
1756
1757 if self.success:
1758 self.host.update_field('dirty', 0)
1759 self.host.set_status(models.Host.Status.READY)
1760
1761 self._finish_epilog()
1762
1763
showarda9545c02009-12-18 22:44:26 +00001764class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
1765 """
1766 Common functionality for QueueTask and HostlessQueueTask
1767 """
1768 def __init__(self, queue_entries):
1769 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001770 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001771 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001772
1773
showard73ec0442009-02-07 02:05:20 +00001774 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001775 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001776
1777
jamesrenc44ae992010-02-19 00:12:54 +00001778 def _write_control_file(self, execution_path):
1779 control_path = _drone_manager.attach_file_to_execution(
1780 execution_path, self.job.control_file)
1781 return control_path
1782
1783
Aviv Keshet308e7362013-05-21 14:43:16 -07001784 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001785 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001786 execution_path = self.queue_entries[0].execution_path()
1787 control_path = self._write_control_file(execution_path)
1788 hostnames = ','.join(entry.host.hostname
1789 for entry in self.queue_entries
1790 if not entry.is_hostless())
1791
1792 execution_tag = self.queue_entries[0].execution_tag()
1793 params = _autoserv_command_line(
1794 hostnames,
1795 ['-P', execution_tag, '-n',
1796 _drone_manager.absolute_path(control_path)],
1797 job=self.job, verbose=False)
1798
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001799 if self.job.is_image_update_job():
1800 params += ['--image', self.job.update_image_path]
1801
jamesrenc44ae992010-02-19 00:12:54 +00001802 return params
showardd1195652009-12-08 22:21:02 +00001803
1804
1805 @property
1806 def num_processes(self):
1807 return len(self.queue_entries)
1808
1809
1810 @property
1811 def owner_username(self):
1812 return self.job.owner
1813
1814
1815 def _working_directory(self):
1816 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001817
1818
jadmanski0afbb632008-06-06 21:10:57 +00001819 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001820 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001821 keyval_dict = self.job.keyval_dict()
1822 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001823 group_name = self.queue_entries[0].get_group_name()
1824 if group_name:
1825 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001826 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001827 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001828 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001829 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001830
1831
showard35162b02009-03-03 02:17:30 +00001832 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001833 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001834 _drone_manager.write_lines_to_file(error_file_path,
1835 [_LOST_PROCESS_ERROR])
1836
1837
showardd3dc1992009-04-22 21:01:40 +00001838 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001839 if not self.monitor:
1840 return
1841
showardd9205182009-04-27 20:09:55 +00001842 self._write_job_finished()
1843
showard35162b02009-03-03 02:17:30 +00001844 if self.monitor.lost_process:
1845 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001846
jadmanskif7fa2cc2008-10-01 14:13:23 +00001847
showardcbd74612008-11-19 21:42:02 +00001848 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001849 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001850 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001851 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001852 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001853
1854
jadmanskif7fa2cc2008-10-01 14:13:23 +00001855 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001856 if not self.monitor or not self.monitor.has_process():
1857 return
1858
jadmanskif7fa2cc2008-10-01 14:13:23 +00001859 # build up sets of all the aborted_by and aborted_on values
1860 aborted_by, aborted_on = set(), set()
1861 for queue_entry in self.queue_entries:
1862 if queue_entry.aborted_by:
1863 aborted_by.add(queue_entry.aborted_by)
1864 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1865 aborted_on.add(t)
1866
1867 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001868 # TODO(showard): this conditional is now obsolete, we just need to leave
1869 # it in temporarily for backwards compatibility over upgrades. delete
1870 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001871 assert len(aborted_by) <= 1
1872 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001873 aborted_by_value = aborted_by.pop()
1874 aborted_on_value = max(aborted_on)
1875 else:
1876 aborted_by_value = 'autotest_system'
1877 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001878
showarda0382352009-02-11 23:36:43 +00001879 self._write_keyval_after_job("aborted_by", aborted_by_value)
1880 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001881
showardcbd74612008-11-19 21:42:02 +00001882 aborted_on_string = str(datetime.datetime.fromtimestamp(
1883 aborted_on_value))
1884 self._write_status_comment('Job aborted by %s on %s' %
1885 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001886
1887
jadmanski0afbb632008-06-06 21:10:57 +00001888 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001889 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001890 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001891 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001892
1893
jadmanski0afbb632008-06-06 21:10:57 +00001894 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001895 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001896 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001897
1898
1899class QueueTask(AbstractQueueTask):
1900 def __init__(self, queue_entries):
1901 super(QueueTask, self).__init__(queue_entries)
1902 self._set_ids(queue_entries=queue_entries)
1903
1904
1905 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001906 self._check_queue_entry_statuses(
1907 self.queue_entries,
1908 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1909 models.HostQueueEntry.Status.RUNNING),
1910 allowed_host_statuses=(models.Host.Status.PENDING,
1911 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001912
1913 super(QueueTask, self).prolog()
1914
1915 for queue_entry in self.queue_entries:
1916 self._write_host_keyvals(queue_entry.host)
1917 queue_entry.host.set_status(models.Host.Status.RUNNING)
1918 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001919
1920
1921 def _finish_task(self):
1922 super(QueueTask, self)._finish_task()
1923
1924 for queue_entry in self.queue_entries:
1925 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001926 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001927
1928
mbligh4608b002010-01-05 18:22:35 +00001929class HostlessQueueTask(AbstractQueueTask):
1930 def __init__(self, queue_entry):
1931 super(HostlessQueueTask, self).__init__([queue_entry])
1932 self.queue_entry_ids = [queue_entry.id]
1933
1934
1935 def prolog(self):
1936 self.queue_entries[0].update_field('execution_subdir', 'hostless')
1937 super(HostlessQueueTask, self).prolog()
1938
1939
mbligh4608b002010-01-05 18:22:35 +00001940 def _finish_task(self):
1941 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00001942 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001943
1944
showardd3dc1992009-04-22 21:01:40 +00001945class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00001946 def __init__(self, queue_entries, log_file_name):
1947 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00001948
showardd1195652009-12-08 22:21:02 +00001949 self.queue_entries = queue_entries
1950
showardd3dc1992009-04-22 21:01:40 +00001951 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00001952 self._autoserv_monitor.attach_to_existing_process(
1953 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00001954
showardd1195652009-12-08 22:21:02 +00001955
1956 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00001957 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00001958 return 'true'
1959 return self._generate_command(
1960 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00001961
1962
1963 def _generate_command(self, results_dir):
1964 raise NotImplementedError('Subclasses must override this')
1965
1966
showardd1195652009-12-08 22:21:02 +00001967 @property
1968 def owner_username(self):
1969 return self.queue_entries[0].job.owner
1970
1971
1972 def _working_directory(self):
1973 return self._get_consistent_execution_path(self.queue_entries)
1974
1975
1976 def _paired_with_monitor(self):
1977 return self._autoserv_monitor
1978
1979
showardd3dc1992009-04-22 21:01:40 +00001980 def _job_was_aborted(self):
1981 was_aborted = None
showardd1195652009-12-08 22:21:02 +00001982 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00001983 queue_entry.update_from_database()
1984 if was_aborted is None: # first queue entry
1985 was_aborted = bool(queue_entry.aborted)
1986 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00001987 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
1988 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00001989 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00001990 'Inconsistent abort state',
1991 'Queue entries have inconsistent abort state:\n' +
1992 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00001993 # don't crash here, just assume true
1994 return True
1995 return was_aborted
1996
1997
showardd1195652009-12-08 22:21:02 +00001998 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00001999 if self._job_was_aborted():
2000 return models.HostQueueEntry.Status.ABORTED
2001
2002 # we'll use a PidfileRunMonitor to read the autoserv exit status
2003 if self._autoserv_monitor.exit_code() == 0:
2004 return models.HostQueueEntry.Status.COMPLETED
2005 return models.HostQueueEntry.Status.FAILED
2006
2007
showardd3dc1992009-04-22 21:01:40 +00002008 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002009 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002010 queue_entry.set_status(status)
2011
2012
2013 def abort(self):
2014 # override AgentTask.abort() to avoid killing the process and ending
2015 # the task. post-job tasks continue when the job is aborted.
2016 pass
2017
2018
mbligh4608b002010-01-05 18:22:35 +00002019 def _pidfile_label(self):
2020 # '.autoserv_execute' -> 'autoserv'
2021 return self._pidfile_name()[1:-len('_execute')]
2022
2023
showard9bb960b2009-11-19 01:02:11 +00002024class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002025 """
2026 Task responsible for
2027 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2028 * copying logs to the results repository
2029 * spawning CleanupTasks for hosts, if necessary
2030 * spawning a FinalReparseTask for the job
2031 """
showardd1195652009-12-08 22:21:02 +00002032 def __init__(self, queue_entries, recover_run_monitor=None):
2033 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002034 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002035 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002036 self._set_ids(queue_entries=queue_entries)
2037
2038
Aviv Keshet308e7362013-05-21 14:43:16 -07002039 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd3dc1992009-04-22 21:01:40 +00002040 def _generate_command(self, results_dir):
2041 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002042 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002043 return [_autoserv_path , '-p',
2044 '--pidfile-label=%s' % self._pidfile_label(),
2045 '--use-existing-results', '--collect-crashinfo',
2046 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002047
2048
showardd1195652009-12-08 22:21:02 +00002049 @property
2050 def num_processes(self):
2051 return len(self.queue_entries)
2052
2053
2054 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002055 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002056
2057
showardd3dc1992009-04-22 21:01:40 +00002058 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002059 self._check_queue_entry_statuses(
2060 self.queue_entries,
2061 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2062 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002063
showardd3dc1992009-04-22 21:01:40 +00002064 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002065
2066
showardd3dc1992009-04-22 21:01:40 +00002067 def epilog(self):
2068 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002069 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002070 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002071
showard9bb960b2009-11-19 01:02:11 +00002072
2073 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002074 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002075 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002076 models.HostQueueEntry.Status.COMPLETED)
2077 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2078 else:
2079 final_success = False
2080 num_tests_failed = 0
2081
showard9bb960b2009-11-19 01:02:11 +00002082 reboot_after = self._job.reboot_after
2083 do_reboot = (
2084 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002085 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002086 or reboot_after == model_attributes.RebootAfter.ALWAYS
2087 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
showard9bb960b2009-11-19 01:02:11 +00002088 and final_success and num_tests_failed == 0))
2089
showardd1195652009-12-08 22:21:02 +00002090 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002091 if do_reboot:
2092 # don't pass the queue entry to the CleanupTask. if the cleanup
2093 # fails, the job doesn't care -- it's over.
2094 models.SpecialTask.objects.create(
2095 host=models.Host.objects.get(id=queue_entry.host.id),
2096 task=models.SpecialTask.Task.CLEANUP,
2097 requested_by=self._job.owner_model())
2098 else:
2099 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002100
2101
showard0bbfc212009-04-29 21:06:13 +00002102 def run(self):
showard597bfd32009-05-08 18:22:50 +00002103 autoserv_exit_code = self._autoserv_monitor.exit_code()
2104 # only run if Autoserv exited due to some signal. if we have no exit
2105 # code, assume something bad (and signal-like) happened.
2106 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002107 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002108 else:
2109 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002110
2111
mbligh4608b002010-01-05 18:22:35 +00002112class SelfThrottledPostJobTask(PostJobTask):
2113 """
2114 Special AgentTask subclass that maintains its own global process limit.
2115 """
2116 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002117
2118
mbligh4608b002010-01-05 18:22:35 +00002119 @classmethod
2120 def _increment_running_processes(cls):
2121 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002122
mblighd5c95802008-03-05 00:33:46 +00002123
mbligh4608b002010-01-05 18:22:35 +00002124 @classmethod
2125 def _decrement_running_processes(cls):
2126 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002127
2128
mbligh4608b002010-01-05 18:22:35 +00002129 @classmethod
2130 def _max_processes(cls):
2131 raise NotImplementedError
2132
2133
2134 @classmethod
2135 def _can_run_new_process(cls):
2136 return cls._num_running_processes < cls._max_processes()
2137
2138
2139 def _process_started(self):
2140 return bool(self.monitor)
2141
2142
2143 def tick(self):
2144 # override tick to keep trying to start until the process count goes
2145 # down and we can, at which point we revert to default behavior
2146 if self._process_started():
2147 super(SelfThrottledPostJobTask, self).tick()
2148 else:
2149 self._try_starting_process()
2150
2151
2152 def run(self):
2153 # override run() to not actually run unless we can
2154 self._try_starting_process()
2155
2156
2157 def _try_starting_process(self):
2158 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002159 return
2160
mbligh4608b002010-01-05 18:22:35 +00002161 # actually run the command
2162 super(SelfThrottledPostJobTask, self).run()
jamesren25663562010-04-27 18:00:55 +00002163 if self._process_started():
2164 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002165
mblighd5c95802008-03-05 00:33:46 +00002166
mbligh4608b002010-01-05 18:22:35 +00002167 def finished(self, success):
2168 super(SelfThrottledPostJobTask, self).finished(success)
2169 if self._process_started():
2170 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002171
showard21baa452008-10-21 00:08:39 +00002172
mbligh4608b002010-01-05 18:22:35 +00002173class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002174 def __init__(self, queue_entries):
2175 super(FinalReparseTask, self).__init__(queue_entries,
2176 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002177 # don't use _set_ids, since we don't want to set the host_ids
2178 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002179
2180
2181 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002182 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002183 results_dir]
2184
2185
2186 @property
2187 def num_processes(self):
2188 return 0 # don't include parser processes in accounting
2189
2190
2191 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002192 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002193
2194
showard97aed502008-11-04 02:01:24 +00002195 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002196 def _max_processes(cls):
2197 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002198
2199
2200 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002201 self._check_queue_entry_statuses(
2202 self.queue_entries,
2203 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002204
showard97aed502008-11-04 02:01:24 +00002205 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002206
2207
2208 def epilog(self):
2209 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002210 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002211
2212
mbligh4608b002010-01-05 18:22:35 +00002213class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002214 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2215
mbligh4608b002010-01-05 18:22:35 +00002216 def __init__(self, queue_entries):
2217 super(ArchiveResultsTask, self).__init__(queue_entries,
2218 log_file_name='.archiving.log')
2219 # don't use _set_ids, since we don't want to set the host_ids
2220 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002221
2222
mbligh4608b002010-01-05 18:22:35 +00002223 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002224 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002225
2226
Aviv Keshet308e7362013-05-21 14:43:16 -07002227 # TODO: Refactor into autoserv_utils. crbug.com/243090
mbligh4608b002010-01-05 18:22:35 +00002228 def _generate_command(self, results_dir):
2229 return [_autoserv_path , '-p',
2230 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002231 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002232 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2233 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002234
2235
mbligh4608b002010-01-05 18:22:35 +00002236 @classmethod
2237 def _max_processes(cls):
2238 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002239
2240
2241 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002242 self._check_queue_entry_statuses(
2243 self.queue_entries,
2244 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2245
2246 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002247
2248
mbligh4608b002010-01-05 18:22:35 +00002249 def epilog(self):
2250 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002251 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002252 failed_file = os.path.join(self._working_directory(),
2253 self._ARCHIVING_FAILED_FILE)
2254 paired_process = self._paired_with_monitor().get_process()
2255 _drone_manager.write_lines_to_file(
2256 failed_file, ['Archiving failed with exit code %s'
2257 % self.monitor.exit_code()],
2258 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002259 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002260
2261
mbligh36768f02008-02-22 18:28:33 +00002262if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002263 main()