blob: fa2e66075935dc5f0372108f8332a956df5b0410 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
Aviv Keshet225bdfe2013-03-05 10:10:08 -08002#pylint: disable-msg=C0111
mbligh36768f02008-02-22 18:28:33 +00003
4"""
5Autotest scheduler
6"""
showard909c7a62008-07-15 21:52:38 +00007
mbligh36768f02008-02-22 18:28:33 +00008
Aviv Keshet225bdfe2013-03-05 10:10:08 -08009import datetime, optparse, os, signal
10import sys, time, traceback, urllib
11import logging, gc
showard402934a2009-12-21 22:20:47 +000012
Alex Miller05d7b4c2013-03-04 07:49:38 -080013import common
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000015
16import django.db
17
showard136e6dc2009-06-10 19:38:49 +000018from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000019from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000020from autotest_lib.database import database_connection
jamesrendd855242010-03-02 22:23:44 +000021from autotest_lib.frontend.afe import model_attributes
Alex Miller05d7b4c2013-03-04 07:49:38 -080022from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000023from autotest_lib.scheduler import drone_manager, drones, email_manager
Dale Curtisaa513362011-03-01 17:27:44 -080024from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
Alex Miller05d7b4c2013-03-04 07:49:38 -080025from autotest_lib.scheduler import scheduler_logging_config
jamesrenc44ae992010-02-19 00:12:54 +000026from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080027from autotest_lib.scheduler import status_server, scheduler_config
Aviv Keshet308e7362013-05-21 14:43:16 -070028from autotest_lib.server import autoserv_utils
Fang Deng1d6c2a02013-04-17 15:25:45 -070029from autotest_lib.site_utils.graphite import stats
Alex Miller05d7b4c2013-03-04 07:49:38 -080030
showard549afad2009-08-20 23:33:36 +000031BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
32PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000033
mbligh36768f02008-02-22 18:28:33 +000034RESULTS_DIR = '.'
35AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000036DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000037AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
38
39if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000040 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000041AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
42AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
43
44if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000045 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000046
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
Aviv Keshet308e7362013-05-21 14:43:16 -070056_autoserv_directory = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server')
57_autoserv_path = os.path.join(_autoserv_directory, 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000059_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000060
Eric Lie0493a42010-11-15 13:05:43 -080061def _parser_path_default(install_dir):
62 return os.path.join(install_dir, 'tko', 'parse')
63_parser_path_func = utils.import_site_function(
64 __file__, 'autotest_lib.scheduler.site_monitor_db',
65 'parser_path', _parser_path_default)
66_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
67
mbligh36768f02008-02-22 18:28:33 +000068
showardec6a3b92009-09-25 20:29:13 +000069def _get_pidfile_timeout_secs():
70 """@returns How long to wait for autoserv to write pidfile."""
71 pidfile_timeout_mins = global_config.global_config.get_config_value(
72 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
73 return pidfile_timeout_mins * 60
74
75
mbligh83c1e9e2009-05-01 23:10:41 +000076def _site_init_monitor_db_dummy():
77 return {}
78
79
jamesren76fcf192010-04-21 20:39:50 +000080def _verify_default_drone_set_exists():
81 if (models.DroneSet.drone_sets_enabled() and
82 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080083 raise host_scheduler.SchedulerError(
84 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000085
86
87def _sanity_check():
88 """Make sure the configs are consistent before starting the scheduler"""
89 _verify_default_drone_set_exists()
90
91
mbligh36768f02008-02-22 18:28:33 +000092def main():
showard27f33872009-04-07 18:20:53 +000093 try:
showard549afad2009-08-20 23:33:36 +000094 try:
95 main_without_exception_handling()
96 except SystemExit:
97 raise
98 except:
99 logging.exception('Exception escaping in monitor_db')
100 raise
101 finally:
102 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000103
104
105def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000106 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000107
showard136e6dc2009-06-10 19:38:49 +0000108 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000109 parser = optparse.OptionParser(usage)
110 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
111 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000112 parser.add_option('--test', help='Indicate that scheduler is under ' +
113 'test and should use dummy autoserv and no parsing',
114 action='store_true')
115 (options, args) = parser.parse_args()
116 if len(args) != 1:
117 parser.print_usage()
118 return
mbligh36768f02008-02-22 18:28:33 +0000119
showard5613c662009-06-08 23:30:33 +0000120 scheduler_enabled = global_config.global_config.get_config_value(
121 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
122
123 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800124 logging.error("Scheduler not enabled, set enable_scheduler to true in "
125 "the global_config's SCHEDULER section to enable it. "
126 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000127 sys.exit(1)
128
jadmanski0afbb632008-06-06 21:10:57 +0000129 global RESULTS_DIR
130 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000131
mbligh83c1e9e2009-05-01 23:10:41 +0000132 site_init = utils.import_site_function(__file__,
133 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
134 _site_init_monitor_db_dummy)
135 site_init()
136
showardcca334f2009-03-12 20:38:34 +0000137 # Change the cwd while running to avoid issues incase we were launched from
138 # somewhere odd (such as a random NFS home directory of the person running
139 # sudo to launch us as the appropriate user).
140 os.chdir(RESULTS_DIR)
141
jamesrenc7d387e2010-08-10 21:48:30 +0000142 # This is helpful for debugging why stuff a scheduler launches is
143 # misbehaving.
144 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000145
jadmanski0afbb632008-06-06 21:10:57 +0000146 if options.test:
147 global _autoserv_path
148 _autoserv_path = 'autoserv_dummy'
149 global _testing_mode
150 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000151
jamesrenc44ae992010-02-19 00:12:54 +0000152 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000153 server.start()
154
jadmanski0afbb632008-06-06 21:10:57 +0000155 try:
jamesrenc44ae992010-02-19 00:12:54 +0000156 initialize()
showardc5afc462009-01-13 00:09:39 +0000157 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000158 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000159
Eric Lia82dc352011-02-23 13:15:52 -0800160 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000161 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000162 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000163 except:
showard170873e2009-01-07 00:22:26 +0000164 email_manager.manager.log_stacktrace(
165 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000166
showard170873e2009-01-07 00:22:26 +0000167 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000168 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000169 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000170 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000171
172
showard136e6dc2009-06-10 19:38:49 +0000173def setup_logging():
174 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
175 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
176 logging_manager.configure_logging(
177 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
178 logfile_name=log_name)
179
180
mbligh36768f02008-02-22 18:28:33 +0000181def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000182 global _shutdown
183 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000184 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000185
186
jamesrenc44ae992010-02-19 00:12:54 +0000187def initialize():
showardb18134f2009-03-20 20:52:18 +0000188 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
189 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000190
showard8de37132009-08-31 18:33:08 +0000191 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000192 logging.critical("monitor_db already running, aborting!")
193 sys.exit(1)
194 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000195
showardb1e51872008-10-07 11:08:18 +0000196 if _testing_mode:
197 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000198 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000199
jadmanski0afbb632008-06-06 21:10:57 +0000200 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
201 global _db
showard170873e2009-01-07 00:22:26 +0000202 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000203 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000204
showardfa8629c2008-11-04 16:51:23 +0000205 # ensure Django connection is in autocommit
206 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000207 # bypass the readonly connection
208 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000209
showardb18134f2009-03-20 20:52:18 +0000210 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000211 signal.signal(signal.SIGINT, handle_sigint)
212
jamesrenc44ae992010-02-19 00:12:54 +0000213 initialize_globals()
214 scheduler_models.initialize()
215
showardd1ee1dd2009-01-07 21:33:08 +0000216 drones = global_config.global_config.get_config_value(
217 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
218 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000219 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000220 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000221 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
222
showardb18134f2009-03-20 20:52:18 +0000223 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000224
225
jamesrenc44ae992010-02-19 00:12:54 +0000226def initialize_globals():
227 global _drone_manager
228 _drone_manager = drone_manager.instance()
229
230
showarded2afea2009-07-07 20:54:07 +0000231def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
232 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000233 """
234 @returns The autoserv command line as a list of executable + parameters.
235
236 @param machines - string - A machine or comma separated list of machines
237 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000238 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700239 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
240 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000241 @param queue_entry - A HostQueueEntry object - If supplied and no Job
242 object was supplied, this will be used to lookup the Job object.
243 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700244 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
245 machines, results_directory=drone_manager.WORKING_DIRECTORY,
246 extra_args=extra_args, job=job, queue_entry=queue_entry,
247 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000248
249
Simran Basia858a232012-08-21 11:04:37 -0700250class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800251
252
jadmanski0afbb632008-06-06 21:10:57 +0000253 def __init__(self):
254 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000255 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800256 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000257 user_cleanup_time = scheduler_config.config.clean_interval
258 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
259 _db, user_cleanup_time)
260 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000261 self._host_agents = {}
262 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000263 self._tick_count = 0
264 self._last_garbage_stats_time = time.time()
265 self._seconds_between_garbage_stats = 60 * (
266 global_config.global_config.get_config_value(
267 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700268 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700269 self._tick_debug = global_config.global_config.get_config_value(
270 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
271 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700272 self._extra_debugging = global_config.global_config.get_config_value(
273 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
274 default=False)
mbligh36768f02008-02-22 18:28:33 +0000275
mbligh36768f02008-02-22 18:28:33 +0000276
showard915958d2009-04-22 21:00:58 +0000277 def initialize(self, recover_hosts=True):
278 self._periodic_cleanup.initialize()
279 self._24hr_upkeep.initialize()
280
jadmanski0afbb632008-06-06 21:10:57 +0000281 # always recover processes
282 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000283
jadmanski0afbb632008-06-06 21:10:57 +0000284 if recover_hosts:
285 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000286
jamesrenc44ae992010-02-19 00:12:54 +0000287 self._host_scheduler.recovery_on_startup()
288
mbligh36768f02008-02-22 18:28:33 +0000289
Simran Basi0ec94dd2012-08-28 09:50:10 -0700290 def _log_tick_msg(self, msg):
291 if self._tick_debug:
292 logging.debug(msg)
293
294
Simran Basidef92872012-09-20 13:34:34 -0700295 def _log_extra_msg(self, msg):
296 if self._extra_debugging:
297 logging.debug(msg)
298
299
jadmanski0afbb632008-06-06 21:10:57 +0000300 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700301 """
302 This is an altered version of tick() where we keep track of when each
303 major step begins so we can try to figure out where we are using most
304 of the tick time.
305 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700306 timer = stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700307 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000308 self._garbage_collection()
Simran Basi3f6717d2012-09-13 15:21:22 -0700309 self._log_tick_msg('Calling _drone_manager.refresh().')
showard170873e2009-01-07 00:22:26 +0000310 _drone_manager.refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700311 self._log_tick_msg('Calling _run_cleanup().')
mblighf3294cc2009-04-08 21:17:38 +0000312 self._run_cleanup()
Simran Basi3f6717d2012-09-13 15:21:22 -0700313 self._log_tick_msg('Calling _find_aborting().')
jadmanski0afbb632008-06-06 21:10:57 +0000314 self._find_aborting()
Simran Basi3f6717d2012-09-13 15:21:22 -0700315 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000316 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700317 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000318 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700319 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000320 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700321 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000322 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700323 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000324 self._schedule_new_jobs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700325 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000326 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700327 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000328 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700329 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000330 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700331 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700332 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700333 with timer.get_client('email_manager_send_queued_emails'):
334 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700335 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700336 with timer.get_client('django_db_reset_queries'):
337 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000338 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000339
showard97aed502008-11-04 02:01:24 +0000340
mblighf3294cc2009-04-08 21:17:38 +0000341 def _run_cleanup(self):
342 self._periodic_cleanup.run_cleanup_maybe()
343 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000344
mbligh36768f02008-02-22 18:28:33 +0000345
showardf13a9e22009-12-18 22:54:09 +0000346 def _garbage_collection(self):
347 threshold_time = time.time() - self._seconds_between_garbage_stats
348 if threshold_time < self._last_garbage_stats_time:
349 # Don't generate these reports very often.
350 return
351
352 self._last_garbage_stats_time = time.time()
353 # Force a full level 0 collection (because we can, it doesn't hurt
354 # at this interval).
355 gc.collect()
356 logging.info('Logging garbage collector stats on tick %d.',
357 self._tick_count)
358 gc_stats._log_garbage_collector_stats()
359
360
showard170873e2009-01-07 00:22:26 +0000361 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
362 for object_id in object_ids:
363 agent_dict.setdefault(object_id, set()).add(agent)
364
365
366 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
367 for object_id in object_ids:
368 assert object_id in agent_dict
369 agent_dict[object_id].remove(agent)
370
371
showardd1195652009-12-08 22:21:02 +0000372 def add_agent_task(self, agent_task):
373 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000374 self._agents.append(agent)
375 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000376 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
377 self._register_agent_for_ids(self._queue_entry_agents,
378 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000379
showard170873e2009-01-07 00:22:26 +0000380
381 def get_agents_for_entry(self, queue_entry):
382 """
383 Find agents corresponding to the specified queue_entry.
384 """
showardd3dc1992009-04-22 21:01:40 +0000385 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000386
387
388 def host_has_agent(self, host):
389 """
390 Determine if there is currently an Agent present using this host.
391 """
392 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000393
394
jadmanski0afbb632008-06-06 21:10:57 +0000395 def remove_agent(self, agent):
396 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000397 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
398 agent)
399 self._unregister_agent_for_ids(self._queue_entry_agents,
400 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000401
402
showard8cc058f2009-09-08 16:26:33 +0000403 def _host_has_scheduled_special_task(self, host):
404 return bool(models.SpecialTask.objects.filter(host__id=host.id,
405 is_active=False,
406 is_complete=False))
407
408
jadmanski0afbb632008-06-06 21:10:57 +0000409 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000410 agent_tasks = self._create_recovery_agent_tasks()
411 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000412 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000413 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000414 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000415 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000416 self._reverify_remaining_hosts()
417 # reinitialize drones after killing orphaned processes, since they can
418 # leave around files when they die
419 _drone_manager.execute_actions()
420 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000421
showard170873e2009-01-07 00:22:26 +0000422
showardd1195652009-12-08 22:21:02 +0000423 def _create_recovery_agent_tasks(self):
424 return (self._get_queue_entry_agent_tasks()
425 + self._get_special_task_agent_tasks(is_active=True))
426
427
428 def _get_queue_entry_agent_tasks(self):
429 # host queue entry statuses handled directly by AgentTasks (Verifying is
430 # handled through SpecialTasks, so is not listed here)
431 statuses = (models.HostQueueEntry.Status.STARTING,
432 models.HostQueueEntry.Status.RUNNING,
433 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000434 models.HostQueueEntry.Status.PARSING,
435 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000436 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000437 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000438 where='status IN (%s)' % status_list)
439
440 agent_tasks = []
441 used_queue_entries = set()
442 for entry in queue_entries:
443 if self.get_agents_for_entry(entry):
444 # already being handled
445 continue
446 if entry in used_queue_entries:
447 # already picked up by a synchronous job
448 continue
449 agent_task = self._get_agent_task_for_queue_entry(entry)
450 agent_tasks.append(agent_task)
451 used_queue_entries.update(agent_task.queue_entries)
452 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000453
454
showardd1195652009-12-08 22:21:02 +0000455 def _get_special_task_agent_tasks(self, is_active=False):
456 special_tasks = models.SpecialTask.objects.filter(
457 is_active=is_active, is_complete=False)
458 return [self._get_agent_task_for_special_task(task)
459 for task in special_tasks]
460
461
462 def _get_agent_task_for_queue_entry(self, queue_entry):
463 """
464 Construct an AgentTask instance for the given active HostQueueEntry,
465 if one can currently run it.
466 @param queue_entry: a HostQueueEntry
467 @returns an AgentTask to run the queue entry
468 """
469 task_entries = queue_entry.job.get_group_entries(queue_entry)
470 self._check_for_duplicate_host_entries(task_entries)
471
472 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
473 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000474 if queue_entry.is_hostless():
475 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000476 return QueueTask(queue_entries=task_entries)
477 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
478 return GatherLogsTask(queue_entries=task_entries)
479 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
480 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000481 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
482 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000483
Dale Curtisaa513362011-03-01 17:27:44 -0800484 raise host_scheduler.SchedulerError(
485 '_get_agent_task_for_queue_entry got entry with '
486 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000487
488
489 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000490 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
491 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000492 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000493 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000494 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000495 if using_host:
showardd1195652009-12-08 22:21:02 +0000496 self._assert_host_has_no_agent(task_entry)
497
498
499 def _assert_host_has_no_agent(self, entry):
500 """
501 @param entry: a HostQueueEntry or a SpecialTask
502 """
503 if self.host_has_agent(entry.host):
504 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800505 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000506 'While scheduling %s, host %s already has a host agent %s'
507 % (entry, entry.host, agent.task))
508
509
510 def _get_agent_task_for_special_task(self, special_task):
511 """
512 Construct an AgentTask class to run the given SpecialTask and add it
513 to this dispatcher.
514 @param special_task: a models.SpecialTask instance
515 @returns an AgentTask to run this SpecialTask
516 """
517 self._assert_host_has_no_agent(special_task)
518
519 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
520 for agent_task_class in special_agent_task_classes:
521 if agent_task_class.TASK_TYPE == special_task.task:
522 return agent_task_class(task=special_task)
523
Dale Curtisaa513362011-03-01 17:27:44 -0800524 raise host_scheduler.SchedulerError(
525 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000526
527
528 def _register_pidfiles(self, agent_tasks):
529 for agent_task in agent_tasks:
530 agent_task.register_necessary_pidfiles()
531
532
533 def _recover_tasks(self, agent_tasks):
534 orphans = _drone_manager.get_orphaned_autoserv_processes()
535
536 for agent_task in agent_tasks:
537 agent_task.recover()
538 if agent_task.monitor and agent_task.monitor.has_process():
539 orphans.discard(agent_task.monitor.get_process())
540 self.add_agent_task(agent_task)
541
542 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000543
544
showard8cc058f2009-09-08 16:26:33 +0000545 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000546 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
547 % status):
showard0db3d432009-10-12 20:29:15 +0000548 if entry.status == status and not self.get_agents_for_entry(entry):
549 # The status can change during iteration, e.g., if job.run()
550 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000551 yield entry
552
553
showard6878e8b2009-07-20 22:37:45 +0000554 def _check_for_remaining_orphan_processes(self, orphans):
555 if not orphans:
556 return
557 subject = 'Unrecovered orphan autoserv processes remain'
558 message = '\n'.join(str(process) for process in orphans)
559 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000560
561 die_on_orphans = global_config.global_config.get_config_value(
562 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
563
564 if die_on_orphans:
565 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000566
showard170873e2009-01-07 00:22:26 +0000567
showard8cc058f2009-09-08 16:26:33 +0000568 def _recover_pending_entries(self):
569 for entry in self._get_unassigned_entries(
570 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000571 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000572 entry.on_pending()
573
574
showardb8900452009-10-12 20:31:01 +0000575 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000576 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000577 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
578 unrecovered_hqes = []
579 for queue_entry in queue_entries:
580 special_tasks = models.SpecialTask.objects.filter(
581 task__in=(models.SpecialTask.Task.CLEANUP,
582 models.SpecialTask.Task.VERIFY),
583 queue_entry__id=queue_entry.id,
584 is_complete=False)
585 if special_tasks.count() == 0:
586 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000587
showardb8900452009-10-12 20:31:01 +0000588 if unrecovered_hqes:
589 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800590 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000591 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000592 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000593
594
showard65db3932009-10-28 19:54:35 +0000595 def _get_prioritized_special_tasks(self):
596 """
597 Returns all queued SpecialTasks prioritized for repair first, then
598 cleanup, then verify.
599 """
600 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
601 is_complete=False,
602 host__locked=False)
603 # exclude hosts with active queue entries unless the SpecialTask is for
604 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000605 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000606 queued_tasks, 'afe_host_queue_entries', 'host_id',
607 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000608 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000609 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000610 where=['(afe_host_queue_entries.id IS NULL OR '
611 'afe_host_queue_entries.id = '
612 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000613
showard65db3932009-10-28 19:54:35 +0000614 # reorder tasks by priority
615 task_priority_order = [models.SpecialTask.Task.REPAIR,
616 models.SpecialTask.Task.CLEANUP,
617 models.SpecialTask.Task.VERIFY]
618 def task_priority_key(task):
619 return task_priority_order.index(task.task)
620 return sorted(queued_tasks, key=task_priority_key)
621
622
showard65db3932009-10-28 19:54:35 +0000623 def _schedule_special_tasks(self):
624 """
625 Execute queued SpecialTasks that are ready to run on idle hosts.
626 """
627 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000628 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000629 continue
showardd1195652009-12-08 22:21:02 +0000630 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000631
632
showard170873e2009-01-07 00:22:26 +0000633 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000634 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000635 # should never happen
showarded2afea2009-07-07 20:54:07 +0000636 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000637 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000638 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000639 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000640 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000641
642
jadmanski0afbb632008-06-06 21:10:57 +0000643 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000644 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700645 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000646 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000647 if self.host_has_agent(host):
648 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000649 continue
showard8cc058f2009-09-08 16:26:33 +0000650 if self._host_has_scheduled_special_task(host):
651 # host will have a special task scheduled on the next cycle
652 continue
showard170873e2009-01-07 00:22:26 +0000653 if print_message:
showardb18134f2009-03-20 20:52:18 +0000654 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000655 models.SpecialTask.objects.create(
656 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000657 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000658
659
jadmanski0afbb632008-06-06 21:10:57 +0000660 def _recover_hosts(self):
661 # recover "Repair Failed" hosts
662 message = 'Reverifying dead host %s'
663 self._reverify_hosts_where("status = 'Repair Failed'",
664 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000665
666
showard04c82c52008-05-29 19:38:12 +0000667
showardb95b1bd2008-08-15 18:11:04 +0000668 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000669 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +0000670 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000671 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000672 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +0000673 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000674
675
showard89f84db2009-03-12 20:39:13 +0000676 def _refresh_pending_queue_entries(self):
677 """
678 Lookup the pending HostQueueEntries and call our HostScheduler
679 refresh() method given that list. Return the list.
680
681 @returns A list of pending HostQueueEntries sorted in priority order.
682 """
showard63a34772008-08-18 19:32:50 +0000683 queue_entries = self._get_pending_queue_entries()
684 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000685 return []
showardb95b1bd2008-08-15 18:11:04 +0000686
showard63a34772008-08-18 19:32:50 +0000687 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000688
showard89f84db2009-03-12 20:39:13 +0000689 return queue_entries
690
691
692 def _schedule_atomic_group(self, queue_entry):
693 """
694 Schedule the given queue_entry on an atomic group of hosts.
695
696 Returns immediately if there are insufficient available hosts.
697
698 Creates new HostQueueEntries based off of queue_entry for the
699 scheduled hosts and starts them all running.
700 """
701 # This is a virtual host queue entry representing an entire
702 # atomic group, find a group and schedule their hosts.
703 group_hosts = self._host_scheduler.find_eligible_atomic_group(
704 queue_entry)
705 if not group_hosts:
706 return
showardcbe6f942009-06-17 19:33:49 +0000707
708 logging.info('Expanding atomic group entry %s with hosts %s',
709 queue_entry,
710 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000711
showard89f84db2009-03-12 20:39:13 +0000712 for assigned_host in group_hosts[1:]:
713 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000714 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000715 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000716 new_hqe.set_host(assigned_host)
717 self._run_queue_entry(new_hqe)
718
719 # The first assigned host uses the original HostQueueEntry
720 queue_entry.set_host(group_hosts[0])
721 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000722
723
showarda9545c02009-12-18 22:44:26 +0000724 def _schedule_hostless_job(self, queue_entry):
725 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000726 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000727
728
showard89f84db2009-03-12 20:39:13 +0000729 def _schedule_new_jobs(self):
730 queue_entries = self._refresh_pending_queue_entries()
731 if not queue_entries:
732 return
733
Simran Basi3f6717d2012-09-13 15:21:22 -0700734 logging.debug('Processing %d queue_entries', len(queue_entries))
showard63a34772008-08-18 19:32:50 +0000735 for queue_entry in queue_entries:
Simran Basidef92872012-09-20 13:34:34 -0700736 self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
showarde55955f2009-10-07 20:48:58 +0000737 is_unassigned_atomic_group = (
738 queue_entry.atomic_group_id is not None
739 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +0000740
741 if queue_entry.is_hostless():
Simran Basidef92872012-09-20 13:34:34 -0700742 self._log_extra_msg('Scheduling hostless job.')
showarda9545c02009-12-18 22:44:26 +0000743 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000744 elif is_unassigned_atomic_group:
745 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +0000746 else:
jamesren883492a2010-02-12 00:45:18 +0000747 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +0000748 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +0000749 assert assigned_host.id == queue_entry.host_id
750 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000751
752
showard8cc058f2009-09-08 16:26:33 +0000753 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +0000754 for agent_task in self._get_queue_entry_agent_tasks():
755 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000756
757
758 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000759 for entry in scheduler_models.HostQueueEntry.fetch(
760 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000761 task = entry.job.schedule_delayed_callback_task(entry)
762 if task:
showardd1195652009-12-08 22:21:02 +0000763 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000764
765
jamesren883492a2010-02-12 00:45:18 +0000766 def _run_queue_entry(self, queue_entry):
Simran Basidef92872012-09-20 13:34:34 -0700767 self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
768 queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000769 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000770
771
jadmanski0afbb632008-06-06 21:10:57 +0000772 def _find_aborting(self):
jamesrene7c65cb2010-06-08 20:38:10 +0000773 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000774 for entry in scheduler_models.HostQueueEntry.fetch(
775 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +0000776 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +0000777 for agent in self.get_agents_for_entry(entry):
778 agent.abort()
779 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000780 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700781 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000782 for job in jobs_to_stop:
783 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000784
785
showard324bf812009-01-20 23:23:38 +0000786 def _can_start_agent(self, agent, num_started_this_cycle,
787 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000788 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000789 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000790 return True
791 # don't allow any nonzero-process agents to run after we've reached a
792 # limit (this avoids starvation of many-process agents)
793 if have_reached_limit:
794 return False
795 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000796 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000797 agent.task.owner_username,
798 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000799 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000800 return False
801 # if a single agent exceeds the per-cycle throttling, still allow it to
802 # run when it's the first agent in the cycle
803 if num_started_this_cycle == 0:
804 return True
805 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000806 if (num_started_this_cycle + agent.task.num_processes >
807 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000808 return False
809 return True
810
811
jadmanski0afbb632008-06-06 21:10:57 +0000812 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000813 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000814 have_reached_limit = False
815 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -0700816 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +0000817 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -0700818 self._log_extra_msg('Processing Agent with Host Ids: %s and '
819 'queue_entry ids:%s' % (agent.host_ids,
820 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +0000821 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000822 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000823 have_reached_limit):
824 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -0700825 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +0000826 continue
showardd1195652009-12-08 22:21:02 +0000827 num_started_this_cycle += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -0700828 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +0000829 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -0700830 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +0000831 if agent.is_done():
Simran Basidef92872012-09-20 13:34:34 -0700832 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +0000833 self.remove_agent(agent)
Simran Basi3f6717d2012-09-13 15:21:22 -0700834 logging.info('%d running processes. %d added this cycle.',
835 _drone_manager.total_running_processes(),
836 num_started_this_cycle)
mbligh36768f02008-02-22 18:28:33 +0000837
838
showard29f7cd22009-04-29 21:16:24 +0000839 def _process_recurring_runs(self):
840 recurring_runs = models.RecurringRun.objects.filter(
841 start_date__lte=datetime.datetime.now())
842 for rrun in recurring_runs:
843 # Create job from template
844 job = rrun.job
845 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000846 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000847
848 host_objects = info['hosts']
849 one_time_hosts = info['one_time_hosts']
850 metahost_objects = info['meta_hosts']
851 dependencies = info['dependencies']
852 atomic_group = info['atomic_group']
853
854 for host in one_time_hosts or []:
855 this_host = models.Host.create_one_time_host(host.hostname)
856 host_objects.append(this_host)
857
858 try:
859 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000860 options=options,
showard29f7cd22009-04-29 21:16:24 +0000861 host_objects=host_objects,
862 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +0000863 atomic_group=atomic_group)
864
865 except Exception, ex:
866 logging.exception(ex)
867 #TODO send email
868
869 if rrun.loop_count == 1:
870 rrun.delete()
871 else:
872 if rrun.loop_count != 0: # if not infinite loop
873 # calculate new start_date
874 difference = datetime.timedelta(seconds=rrun.loop_period)
875 rrun.start_date = rrun.start_date + difference
876 rrun.loop_count -= 1
877 rrun.save()
878
879
Simran Basia858a232012-08-21 11:04:37 -0700880SiteDispatcher = utils.import_site_class(
881 __file__, 'autotest_lib.scheduler.site_monitor_db',
882 'SiteDispatcher', BaseDispatcher)
883
884class Dispatcher(SiteDispatcher):
885 pass
886
887
showard170873e2009-01-07 00:22:26 +0000888class PidfileRunMonitor(object):
889 """
890 Client must call either run() to start a new process or
891 attach_to_existing_process().
892 """
mbligh36768f02008-02-22 18:28:33 +0000893
showard170873e2009-01-07 00:22:26 +0000894 class _PidfileException(Exception):
895 """
896 Raised when there's some unexpected behavior with the pid file, but only
897 used internally (never allowed to escape this class).
898 """
mbligh36768f02008-02-22 18:28:33 +0000899
900
showard170873e2009-01-07 00:22:26 +0000901 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000902 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000903 self._start_time = None
904 self.pidfile_id = None
905 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000906
907
showard170873e2009-01-07 00:22:26 +0000908 def _add_nice_command(self, command, nice_level):
909 if not nice_level:
910 return command
911 return ['nice', '-n', str(nice_level)] + command
912
913
914 def _set_start_time(self):
915 self._start_time = time.time()
916
917
showard418785b2009-11-23 20:19:59 +0000918 def run(self, command, working_directory, num_processes, nice_level=None,
919 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +0000920 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +0000921 assert command is not None
922 if nice_level is not None:
923 command = ['nice', '-n', str(nice_level)] + command
924 self._set_start_time()
925 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +0000926 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +0000927 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +0000928 paired_with_pidfile=paired_with_pidfile, username=username,
929 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +0000930
931
showarded2afea2009-07-07 20:54:07 +0000932 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +0000933 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +0000934 num_processes=None):
showard170873e2009-01-07 00:22:26 +0000935 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +0000936 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000937 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +0000938 if num_processes is not None:
939 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +0000940
941
jadmanski0afbb632008-06-06 21:10:57 +0000942 def kill(self):
showard170873e2009-01-07 00:22:26 +0000943 if self.has_process():
944 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000945
mbligh36768f02008-02-22 18:28:33 +0000946
showard170873e2009-01-07 00:22:26 +0000947 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000948 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000949 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000950
951
showard170873e2009-01-07 00:22:26 +0000952 def get_process(self):
showard21baa452008-10-21 00:08:39 +0000953 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +0000954 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +0000955 return self._state.process
mblighbb421852008-03-11 22:36:16 +0000956
957
showard170873e2009-01-07 00:22:26 +0000958 def _read_pidfile(self, use_second_read=False):
959 assert self.pidfile_id is not None, (
960 'You must call run() or attach_to_existing_process()')
961 contents = _drone_manager.get_pidfile_contents(
962 self.pidfile_id, use_second_read=use_second_read)
963 if contents.is_invalid():
964 self._state = drone_manager.PidfileContents()
965 raise self._PidfileException(contents)
966 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +0000967
968
showard21baa452008-10-21 00:08:39 +0000969 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +0000970 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
971 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +0000972 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +0000973 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +0000974
975
976 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +0000977 if self.lost_process:
showard21baa452008-10-21 00:08:39 +0000978 return
mblighbb421852008-03-11 22:36:16 +0000979
showard21baa452008-10-21 00:08:39 +0000980 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +0000981
showard170873e2009-01-07 00:22:26 +0000982 if self._state.process is None:
983 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +0000984 return
mbligh90a549d2008-03-25 23:52:34 +0000985
showard21baa452008-10-21 00:08:39 +0000986 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000987 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +0000988 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +0000989 return
mbligh90a549d2008-03-25 23:52:34 +0000990
showard170873e2009-01-07 00:22:26 +0000991 # pid but no running process - maybe process *just* exited
992 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +0000993 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000994 # autoserv exited without writing an exit code
995 # to the pidfile
showard21baa452008-10-21 00:08:39 +0000996 self._handle_pidfile_error(
997 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +0000998
showard21baa452008-10-21 00:08:39 +0000999
1000 def _get_pidfile_info(self):
1001 """\
1002 After completion, self._state will contain:
1003 pid=None, exit_status=None if autoserv has not yet run
1004 pid!=None, exit_status=None if autoserv is running
1005 pid!=None, exit_status!=None if autoserv has completed
1006 """
1007 try:
1008 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001009 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001010 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001011
1012
showard170873e2009-01-07 00:22:26 +00001013 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001014 """\
1015 Called when no pidfile is found or no pid is in the pidfile.
1016 """
showard170873e2009-01-07 00:22:26 +00001017 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001018 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001019 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001020 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001021 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001022
1023
showard35162b02009-03-03 02:17:30 +00001024 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001025 """\
1026 Called when autoserv has exited without writing an exit status,
1027 or we've timed out waiting for autoserv to write a pid to the
1028 pidfile. In either case, we just return failure and the caller
1029 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001030
showard170873e2009-01-07 00:22:26 +00001031 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001032 """
1033 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001034 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001035 self._state.exit_status = 1
1036 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001037
1038
jadmanski0afbb632008-06-06 21:10:57 +00001039 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001040 self._get_pidfile_info()
1041 return self._state.exit_status
1042
1043
1044 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001045 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001046 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001047 if self._state.num_tests_failed is None:
1048 return -1
showard21baa452008-10-21 00:08:39 +00001049 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001050
1051
showardcdaeae82009-08-31 18:32:48 +00001052 def try_copy_results_on_drone(self, **kwargs):
1053 if self.has_process():
1054 # copy results logs into the normal place for job results
1055 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1056
1057
1058 def try_copy_to_results_repository(self, source, **kwargs):
1059 if self.has_process():
1060 _drone_manager.copy_to_results_repository(self.get_process(),
1061 source, **kwargs)
1062
1063
mbligh36768f02008-02-22 18:28:33 +00001064class Agent(object):
showard77182562009-06-10 00:16:05 +00001065 """
showard8cc058f2009-09-08 16:26:33 +00001066 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001067
1068 The following methods are required on all task objects:
1069 poll() - Called periodically to let the task check its status and
1070 update its internal state. If the task succeeded.
1071 is_done() - Returns True if the task is finished.
1072 abort() - Called when an abort has been requested. The task must
1073 set its aborted attribute to True if it actually aborted.
1074
1075 The following attributes are required on all task objects:
1076 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001077 success - bool, True if this task succeeded.
1078 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1079 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001080 """
1081
1082
showard418785b2009-11-23 20:19:59 +00001083 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001084 """
showard8cc058f2009-09-08 16:26:33 +00001085 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001086 """
showard8cc058f2009-09-08 16:26:33 +00001087 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001088
showard77182562009-06-10 00:16:05 +00001089 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001090 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001091
showard8cc058f2009-09-08 16:26:33 +00001092 self.queue_entry_ids = task.queue_entry_ids
1093 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001094
showard8cc058f2009-09-08 16:26:33 +00001095 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001096 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001097
1098
jadmanski0afbb632008-06-06 21:10:57 +00001099 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001100 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001101 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001102 self.task.poll()
1103 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001104 self.finished = True
showardec113162008-05-08 00:52:49 +00001105
1106
jadmanski0afbb632008-06-06 21:10:57 +00001107 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001108 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001109
1110
showardd3dc1992009-04-22 21:01:40 +00001111 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001112 if self.task:
1113 self.task.abort()
1114 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001115 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001116 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001117
showardd3dc1992009-04-22 21:01:40 +00001118
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001119class BaseAgentTask(object):
showardd1195652009-12-08 22:21:02 +00001120 class _NullMonitor(object):
1121 pidfile_id = None
1122
1123 def has_process(self):
1124 return True
1125
1126
1127 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001128 """
showardd1195652009-12-08 22:21:02 +00001129 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001130 """
jadmanski0afbb632008-06-06 21:10:57 +00001131 self.done = False
showardd1195652009-12-08 22:21:02 +00001132 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001133 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001134 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001135 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001136 self.queue_entry_ids = []
1137 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001138 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001139
1140
1141 def _set_ids(self, host=None, queue_entries=None):
1142 if queue_entries and queue_entries != [None]:
1143 self.host_ids = [entry.host.id for entry in queue_entries]
1144 self.queue_entry_ids = [entry.id for entry in queue_entries]
1145 else:
1146 assert host
1147 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001148
1149
jadmanski0afbb632008-06-06 21:10:57 +00001150 def poll(self):
showard08a36412009-05-05 01:01:13 +00001151 if not self.started:
1152 self.start()
showardd1195652009-12-08 22:21:02 +00001153 if not self.done:
1154 self.tick()
showard08a36412009-05-05 01:01:13 +00001155
1156
1157 def tick(self):
showardd1195652009-12-08 22:21:02 +00001158 assert self.monitor
1159 exit_code = self.monitor.exit_code()
1160 if exit_code is None:
1161 return
mbligh36768f02008-02-22 18:28:33 +00001162
showardd1195652009-12-08 22:21:02 +00001163 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001164 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001165
1166
jadmanski0afbb632008-06-06 21:10:57 +00001167 def is_done(self):
1168 return self.done
mbligh36768f02008-02-22 18:28:33 +00001169
1170
jadmanski0afbb632008-06-06 21:10:57 +00001171 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001172 if self.done:
showardd1195652009-12-08 22:21:02 +00001173 assert self.started
showard08a36412009-05-05 01:01:13 +00001174 return
showardd1195652009-12-08 22:21:02 +00001175 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001176 self.done = True
1177 self.success = success
1178 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001179
1180
jadmanski0afbb632008-06-06 21:10:57 +00001181 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001182 """
1183 To be overridden.
1184 """
showarded2afea2009-07-07 20:54:07 +00001185 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001186 self.register_necessary_pidfiles()
1187
1188
1189 def _log_file(self):
1190 if not self._log_file_name:
1191 return None
1192 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001193
mbligh36768f02008-02-22 18:28:33 +00001194
jadmanski0afbb632008-06-06 21:10:57 +00001195 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001196 log_file = self._log_file()
1197 if self.monitor and log_file:
1198 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001199
1200
jadmanski0afbb632008-06-06 21:10:57 +00001201 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001202 """
1203 To be overridden.
1204 """
jadmanski0afbb632008-06-06 21:10:57 +00001205 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001206 logging.info("%s finished with success=%s", type(self).__name__,
1207 self.success)
1208
mbligh36768f02008-02-22 18:28:33 +00001209
1210
jadmanski0afbb632008-06-06 21:10:57 +00001211 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001212 if not self.started:
1213 self.prolog()
1214 self.run()
1215
1216 self.started = True
1217
1218
1219 def abort(self):
1220 if self.monitor:
1221 self.monitor.kill()
1222 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001223 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001224 self.cleanup()
1225
1226
showarded2afea2009-07-07 20:54:07 +00001227 def _get_consistent_execution_path(self, execution_entries):
1228 first_execution_path = execution_entries[0].execution_path()
1229 for execution_entry in execution_entries[1:]:
1230 assert execution_entry.execution_path() == first_execution_path, (
1231 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1232 execution_entry,
1233 first_execution_path,
1234 execution_entries[0]))
1235 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001236
1237
showarded2afea2009-07-07 20:54:07 +00001238 def _copy_results(self, execution_entries, use_monitor=None):
1239 """
1240 @param execution_entries: list of objects with execution_path() method
1241 """
showard6d1c1432009-08-20 23:30:39 +00001242 if use_monitor is not None and not use_monitor.has_process():
1243 return
1244
showarded2afea2009-07-07 20:54:07 +00001245 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001246 if use_monitor is None:
1247 assert self.monitor
1248 use_monitor = self.monitor
1249 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001250 execution_path = self._get_consistent_execution_path(execution_entries)
1251 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001252 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001253
showarda1e74b32009-05-12 17:32:04 +00001254
1255 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001256 for queue_entry in queue_entries:
1257 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001258
1259
mbligh4608b002010-01-05 18:22:35 +00001260 def _archive_results(self, queue_entries):
1261 for queue_entry in queue_entries:
1262 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001263
1264
showardd1195652009-12-08 22:21:02 +00001265 def _command_line(self):
1266 """
1267 Return the command line to run. Must be overridden.
1268 """
1269 raise NotImplementedError
1270
1271
1272 @property
1273 def num_processes(self):
1274 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001275 Return the number of processes forked by this BaseAgentTask's process.
1276 It may only be approximate. To be overridden if necessary.
showardd1195652009-12-08 22:21:02 +00001277 """
1278 return 1
1279
1280
1281 def _paired_with_monitor(self):
1282 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001283 If this BaseAgentTask's process must run on the same machine as some
showardd1195652009-12-08 22:21:02 +00001284 previous process, this method should be overridden to return a
1285 PidfileRunMonitor for that process.
1286 """
1287 return self._NullMonitor()
1288
1289
1290 @property
1291 def owner_username(self):
1292 """
1293 Return login of user responsible for this task. May be None. Must be
1294 overridden.
1295 """
1296 raise NotImplementedError
1297
1298
1299 def _working_directory(self):
1300 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001301 Return the directory where this BaseAgentTask's process executes.
1302 Must be overridden.
showardd1195652009-12-08 22:21:02 +00001303 """
1304 raise NotImplementedError
1305
1306
1307 def _pidfile_name(self):
1308 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001309 Return the name of the pidfile this BaseAgentTask's process uses. To be
showardd1195652009-12-08 22:21:02 +00001310 overridden if necessary.
1311 """
jamesrenc44ae992010-02-19 00:12:54 +00001312 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001313
1314
1315 def _check_paired_results_exist(self):
1316 if not self._paired_with_monitor().has_process():
1317 email_manager.manager.enqueue_notify_email(
1318 'No paired results in task',
1319 'No paired results in task %s at %s'
1320 % (self, self._paired_with_monitor().pidfile_id))
1321 self.finished(False)
1322 return False
1323 return True
1324
1325
1326 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001327 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001328 self.monitor = PidfileRunMonitor()
1329
1330
1331 def run(self):
1332 if not self._check_paired_results_exist():
1333 return
1334
1335 self._create_monitor()
1336 self.monitor.run(
1337 self._command_line(), self._working_directory(),
1338 num_processes=self.num_processes,
1339 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1340 pidfile_name=self._pidfile_name(),
1341 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001342 username=self.owner_username,
1343 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1344
1345
1346 def get_drone_hostnames_allowed(self):
1347 if not models.DroneSet.drone_sets_enabled():
1348 return None
1349
1350 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1351 if not hqes:
1352 # Only special tasks could be missing host queue entries
1353 assert isinstance(self, SpecialAgentTask)
1354 return self._user_or_global_default_drone_set(
1355 self.task, self.task.requested_by)
1356
1357 job_ids = hqes.values_list('job', flat=True).distinct()
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001358 assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
jamesren76fcf192010-04-21 20:39:50 +00001359 "span multiple jobs")
1360
1361 job = models.Job.objects.get(id=job_ids[0])
1362 drone_set = job.drone_set
1363 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001364 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001365
1366 return drone_set.get_drone_hostnames()
1367
1368
1369 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1370 """
1371 Returns the user's default drone set, if present.
1372
1373 Otherwise, returns the global default drone set.
1374 """
1375 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1376 if not user:
1377 logging.warn('%s had no owner; using default drone set',
1378 obj_with_owner)
1379 return default_hostnames
1380 if not user.drone_set:
1381 logging.warn('User %s has no default drone set, using global '
1382 'default', user.login)
1383 return default_hostnames
1384 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001385
1386
1387 def register_necessary_pidfiles(self):
1388 pidfile_id = _drone_manager.get_pidfile_id_from(
1389 self._working_directory(), self._pidfile_name())
1390 _drone_manager.register_pidfile(pidfile_id)
1391
1392 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1393 if paired_pidfile_id:
1394 _drone_manager.register_pidfile(paired_pidfile_id)
1395
1396
1397 def recover(self):
1398 if not self._check_paired_results_exist():
1399 return
1400
1401 self._create_monitor()
1402 self.monitor.attach_to_existing_process(
1403 self._working_directory(), pidfile_name=self._pidfile_name(),
1404 num_processes=self.num_processes)
1405 if not self.monitor.has_process():
1406 # no process to recover; wait to be started normally
1407 self.monitor = None
1408 return
1409
1410 self.started = True
Aviv Keshet225bdfe2013-03-05 10:10:08 -08001411 logging.info('Recovering process %s for %s at %s',
1412 self.monitor.get_process(), type(self).__name__,
1413 self._working_directory())
mbligh36768f02008-02-22 18:28:33 +00001414
1415
mbligh4608b002010-01-05 18:22:35 +00001416 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1417 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001418 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001419 for entry in queue_entries:
1420 if entry.status not in allowed_hqe_statuses:
Dale Curtisaa513362011-03-01 17:27:44 -08001421 raise host_scheduler.SchedulerError(
1422 '%s attempting to start entry with invalid status %s: '
1423 '%s' % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001424 invalid_host_status = (
1425 allowed_host_statuses is not None
1426 and entry.host.status not in allowed_host_statuses)
1427 if invalid_host_status:
Dale Curtisaa513362011-03-01 17:27:44 -08001428 raise host_scheduler.SchedulerError(
1429 '%s attempting to start on queue entry with invalid '
1430 'host status %s: %s'
1431 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001432
1433
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001434SiteAgentTask = utils.import_site_class(
1435 __file__, 'autotest_lib.scheduler.site_monitor_db',
1436 'SiteAgentTask', BaseAgentTask)
1437
1438class AgentTask(SiteAgentTask):
1439 pass
1440
1441
showardd9205182009-04-27 20:09:55 +00001442class TaskWithJobKeyvals(object):
1443 """AgentTask mixin providing functionality to help with job keyval files."""
1444 _KEYVAL_FILE = 'keyval'
1445 def _format_keyval(self, key, value):
1446 return '%s=%s' % (key, value)
1447
1448
1449 def _keyval_path(self):
1450 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001451 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001452
1453
1454 def _write_keyval_after_job(self, field, value):
1455 assert self.monitor
1456 if not self.monitor.has_process():
1457 return
1458 _drone_manager.write_lines_to_file(
1459 self._keyval_path(), [self._format_keyval(field, value)],
1460 paired_with_process=self.monitor.get_process())
1461
1462
1463 def _job_queued_keyval(self, job):
1464 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1465
1466
1467 def _write_job_finished(self):
1468 self._write_keyval_after_job("job_finished", int(time.time()))
1469
1470
showarddb502762009-09-09 15:31:20 +00001471 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1472 keyval_contents = '\n'.join(self._format_keyval(key, value)
1473 for key, value in keyval_dict.iteritems())
1474 # always end with a newline to allow additional keyvals to be written
1475 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001476 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001477 keyval_contents,
1478 file_path=keyval_path)
1479
1480
1481 def _write_keyvals_before_job(self, keyval_dict):
1482 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1483
1484
1485 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001486 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001487 host.hostname)
1488 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001489 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001490 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1491 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1492
1493
showard8cc058f2009-09-08 16:26:33 +00001494class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001495 """
1496 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1497 """
1498
1499 TASK_TYPE = None
1500 host = None
1501 queue_entry = None
1502
showardd1195652009-12-08 22:21:02 +00001503 def __init__(self, task, extra_command_args):
1504 super(SpecialAgentTask, self).__init__()
1505
lmrb7c5d272010-04-16 06:34:04 +00001506 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001507
jamesrenc44ae992010-02-19 00:12:54 +00001508 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001509 self.queue_entry = None
1510 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001511 self.queue_entry = scheduler_models.HostQueueEntry(
1512 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001513
showarded2afea2009-07-07 20:54:07 +00001514 self.task = task
1515 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001516
1517
showard8cc058f2009-09-08 16:26:33 +00001518 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001519 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1520
1521
1522 def _command_line(self):
1523 return _autoserv_command_line(self.host.hostname,
1524 self._extra_command_args,
1525 queue_entry=self.queue_entry)
1526
1527
1528 def _working_directory(self):
1529 return self.task.execution_path()
1530
1531
1532 @property
1533 def owner_username(self):
1534 if self.task.requested_by:
1535 return self.task.requested_by.login
1536 return None
showard8cc058f2009-09-08 16:26:33 +00001537
1538
showarded2afea2009-07-07 20:54:07 +00001539 def prolog(self):
1540 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001541 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001542 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001543
1544
showardde634ee2009-01-30 01:44:24 +00001545 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001546 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001547
showard2fe3f1d2009-07-06 20:19:11 +00001548 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001549 return # don't fail metahost entries, they'll be reassigned
1550
showard2fe3f1d2009-07-06 20:19:11 +00001551 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001552 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001553 return # entry has been aborted
1554
showard2fe3f1d2009-07-06 20:19:11 +00001555 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001556 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001557 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001558 self._write_keyval_after_job(queued_key, queued_time)
1559 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001560
showard8cc058f2009-09-08 16:26:33 +00001561 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001562 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001563 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001564 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001565
showard8cc058f2009-09-08 16:26:33 +00001566 pidfile_id = _drone_manager.get_pidfile_id_from(
1567 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001568 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001569 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001570
1571 if self.queue_entry.job.parse_failed_repair:
1572 self._parse_results([self.queue_entry])
1573 else:
1574 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001575
1576
1577 def cleanup(self):
1578 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001579
1580 # We will consider an aborted task to be "Failed"
1581 self.task.finish(bool(self.success))
1582
showardf85a0b72009-10-07 20:48:45 +00001583 if self.monitor:
1584 if self.monitor.has_process():
1585 self._copy_results([self.task])
1586 if self.monitor.pidfile_id is not None:
1587 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001588
1589
1590class RepairTask(SpecialAgentTask):
1591 TASK_TYPE = models.SpecialTask.Task.REPAIR
1592
1593
showardd1195652009-12-08 22:21:02 +00001594 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001595 """\
1596 queue_entry: queue entry to mark failed if this repair fails.
1597 """
1598 protection = host_protections.Protection.get_string(
1599 task.host.protection)
1600 # normalize the protection name
1601 protection = host_protections.Protection.get_attr_name(protection)
1602
1603 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001604 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001605
1606 # *don't* include the queue entry in IDs -- if the queue entry is
1607 # aborted, we want to leave the repair task running
1608 self._set_ids(host=self.host)
1609
1610
1611 def prolog(self):
1612 super(RepairTask, self).prolog()
1613 logging.info("repair_task starting")
1614 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001615
1616
jadmanski0afbb632008-06-06 21:10:57 +00001617 def epilog(self):
1618 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001619
jadmanski0afbb632008-06-06 21:10:57 +00001620 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001621 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001622 else:
showard8cc058f2009-09-08 16:26:33 +00001623 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001624 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001625 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001626
1627
showarded2afea2009-07-07 20:54:07 +00001628class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001629 def _copy_to_results_repository(self):
1630 if not self.queue_entry or self.queue_entry.meta_host:
1631 return
1632
1633 self.queue_entry.set_execution_subdir()
1634 log_name = os.path.basename(self.task.execution_path())
1635 source = os.path.join(self.task.execution_path(), 'debug',
1636 'autoserv.DEBUG')
1637 destination = os.path.join(
1638 self.queue_entry.execution_path(), log_name)
1639
1640 self.monitor.try_copy_to_results_repository(
1641 source, destination_path=destination)
1642
1643
showard170873e2009-01-07 00:22:26 +00001644 def epilog(self):
1645 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001646
showard775300b2009-09-09 15:30:50 +00001647 if self.success:
1648 return
showard8fe93b52008-11-18 17:53:22 +00001649
showard775300b2009-09-09 15:30:50 +00001650 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001651
showard775300b2009-09-09 15:30:50 +00001652 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001653 # effectively ignore failure for these hosts
1654 self.success = True
showard775300b2009-09-09 15:30:50 +00001655 return
1656
1657 if self.queue_entry:
1658 self.queue_entry.requeue()
1659
1660 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001661 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001662 queue_entry__id=self.queue_entry.id):
1663 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1664 self._fail_queue_entry()
1665 return
1666
showard9bb960b2009-11-19 01:02:11 +00001667 queue_entry = models.HostQueueEntry.objects.get(
1668 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001669 else:
1670 queue_entry = None
1671
1672 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001673 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001674 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001675 queue_entry=queue_entry,
1676 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001677
showard8fe93b52008-11-18 17:53:22 +00001678
Alex Miller42437f92013-05-28 12:58:54 -07001679 def _should_pending(self):
1680 """
1681 Decide if we should call the host queue entry's on_pending method.
1682 We should if:
1683 1) There exists an associated host queue entry.
1684 2) The current special task completed successfully.
1685 3) There do not exist any more special tasks to be run before the
1686 host queue entry starts.
1687
1688 @returns: True if we should call pending, false if not.
1689
1690 """
1691 if not self.queue_entry or not self.success:
1692 return False
1693
1694 # We know if this is the last one when we create it, so we could add
1695 # another column to the database to keep track of this information, but
1696 # I expect the overhead of querying here to be minimal.
1697 queue_entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1698 queued = models.SpecialTask.objects.filter(
1699 host__id=self.host.id, is_active=False,
1700 is_complete=False, queue_entry=queue_entry)
1701 queued = queued.exclude(id=self.task.id)
1702 return queued.count() == 0
1703
1704
showard8fe93b52008-11-18 17:53:22 +00001705class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001706 TASK_TYPE = models.SpecialTask.Task.VERIFY
1707
1708
showardd1195652009-12-08 22:21:02 +00001709 def __init__(self, task):
1710 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001711 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001712
1713
jadmanski0afbb632008-06-06 21:10:57 +00001714 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001715 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001716
showardb18134f2009-03-20 20:52:18 +00001717 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001718 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001719 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1720 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001721
jamesren42318f72010-05-10 23:40:59 +00001722 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00001723 # and there's no need to keep records of other requests.
1724 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001725 host__id=self.host.id,
1726 task=models.SpecialTask.Task.VERIFY,
jamesren42318f72010-05-10 23:40:59 +00001727 is_active=False, is_complete=False, queue_entry=None)
showarded2afea2009-07-07 20:54:07 +00001728 queued_verifies = queued_verifies.exclude(id=self.task.id)
1729 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001730
mbligh36768f02008-02-22 18:28:33 +00001731
jadmanski0afbb632008-06-06 21:10:57 +00001732 def epilog(self):
1733 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001734 if self.success:
Alex Miller42437f92013-05-28 12:58:54 -07001735 if self._should_pending():
showard8cc058f2009-09-08 16:26:33 +00001736 self.queue_entry.on_pending()
1737 else:
1738 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001739
1740
mbligh4608b002010-01-05 18:22:35 +00001741class CleanupTask(PreJobTask):
1742 # note this can also run post-job, but when it does, it's running standalone
1743 # against the host (not related to the job), so it's not considered a
1744 # PostJobTask
1745
1746 TASK_TYPE = models.SpecialTask.Task.CLEANUP
1747
1748
1749 def __init__(self, task, recover_run_monitor=None):
1750 super(CleanupTask, self).__init__(task, ['--cleanup'])
1751 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1752
1753
1754 def prolog(self):
1755 super(CleanupTask, self).prolog()
1756 logging.info("starting cleanup task for host: %s", self.host.hostname)
1757 self.host.set_status(models.Host.Status.CLEANING)
1758 if self.queue_entry:
1759 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1760
1761
1762 def _finish_epilog(self):
1763 if not self.queue_entry or not self.success:
1764 return
1765
1766 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
1767 should_run_verify = (
1768 self.queue_entry.job.run_verify
1769 and self.host.protection != do_not_verify_protection)
1770 if should_run_verify:
1771 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1772 models.SpecialTask.objects.create(
1773 host=models.Host.objects.get(id=self.host.id),
1774 queue_entry=entry,
1775 task=models.SpecialTask.Task.VERIFY)
1776 else:
Alex Miller42437f92013-05-28 12:58:54 -07001777 if self._should_pending():
1778 self.queue_entry.on_pending()
mbligh4608b002010-01-05 18:22:35 +00001779
1780
1781 def epilog(self):
1782 super(CleanupTask, self).epilog()
1783
1784 if self.success:
1785 self.host.update_field('dirty', 0)
1786 self.host.set_status(models.Host.Status.READY)
1787
1788 self._finish_epilog()
1789
1790
showarda9545c02009-12-18 22:44:26 +00001791class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
1792 """
1793 Common functionality for QueueTask and HostlessQueueTask
1794 """
1795 def __init__(self, queue_entries):
1796 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001797 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001798 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001799
1800
showard73ec0442009-02-07 02:05:20 +00001801 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001802 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001803
1804
jamesrenc44ae992010-02-19 00:12:54 +00001805 def _write_control_file(self, execution_path):
1806 control_path = _drone_manager.attach_file_to_execution(
1807 execution_path, self.job.control_file)
1808 return control_path
1809
1810
Aviv Keshet308e7362013-05-21 14:43:16 -07001811 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001812 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001813 execution_path = self.queue_entries[0].execution_path()
1814 control_path = self._write_control_file(execution_path)
1815 hostnames = ','.join(entry.host.hostname
1816 for entry in self.queue_entries
1817 if not entry.is_hostless())
1818
1819 execution_tag = self.queue_entries[0].execution_tag()
1820 params = _autoserv_command_line(
1821 hostnames,
beepscb6f1e22013-06-28 19:14:10 -07001822 ['-P', execution_tag, '-n', '--verify_job_repo_url',
jamesrenc44ae992010-02-19 00:12:54 +00001823 _drone_manager.absolute_path(control_path)],
1824 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001825 if self.job.is_image_update_job():
1826 params += ['--image', self.job.update_image_path]
1827
jamesrenc44ae992010-02-19 00:12:54 +00001828 return params
showardd1195652009-12-08 22:21:02 +00001829
1830
1831 @property
1832 def num_processes(self):
1833 return len(self.queue_entries)
1834
1835
1836 @property
1837 def owner_username(self):
1838 return self.job.owner
1839
1840
1841 def _working_directory(self):
1842 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001843
1844
jadmanski0afbb632008-06-06 21:10:57 +00001845 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001846 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001847 keyval_dict = self.job.keyval_dict()
1848 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001849 group_name = self.queue_entries[0].get_group_name()
1850 if group_name:
1851 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001852 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001853 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001854 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001855 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001856
1857
showard35162b02009-03-03 02:17:30 +00001858 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001859 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001860 _drone_manager.write_lines_to_file(error_file_path,
1861 [_LOST_PROCESS_ERROR])
1862
1863
showardd3dc1992009-04-22 21:01:40 +00001864 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001865 if not self.monitor:
1866 return
1867
showardd9205182009-04-27 20:09:55 +00001868 self._write_job_finished()
1869
showard35162b02009-03-03 02:17:30 +00001870 if self.monitor.lost_process:
1871 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001872
jadmanskif7fa2cc2008-10-01 14:13:23 +00001873
showardcbd74612008-11-19 21:42:02 +00001874 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001875 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001876 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001877 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001878 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001879
1880
jadmanskif7fa2cc2008-10-01 14:13:23 +00001881 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001882 if not self.monitor or not self.monitor.has_process():
1883 return
1884
jadmanskif7fa2cc2008-10-01 14:13:23 +00001885 # build up sets of all the aborted_by and aborted_on values
1886 aborted_by, aborted_on = set(), set()
1887 for queue_entry in self.queue_entries:
1888 if queue_entry.aborted_by:
1889 aborted_by.add(queue_entry.aborted_by)
1890 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1891 aborted_on.add(t)
1892
1893 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001894 # TODO(showard): this conditional is now obsolete, we just need to leave
1895 # it in temporarily for backwards compatibility over upgrades. delete
1896 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001897 assert len(aborted_by) <= 1
1898 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001899 aborted_by_value = aborted_by.pop()
1900 aborted_on_value = max(aborted_on)
1901 else:
1902 aborted_by_value = 'autotest_system'
1903 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001904
showarda0382352009-02-11 23:36:43 +00001905 self._write_keyval_after_job("aborted_by", aborted_by_value)
1906 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001907
showardcbd74612008-11-19 21:42:02 +00001908 aborted_on_string = str(datetime.datetime.fromtimestamp(
1909 aborted_on_value))
1910 self._write_status_comment('Job aborted by %s on %s' %
1911 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001912
1913
jadmanski0afbb632008-06-06 21:10:57 +00001914 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001915 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001916 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001917 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001918
1919
jadmanski0afbb632008-06-06 21:10:57 +00001920 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001921 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001922 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001923
1924
1925class QueueTask(AbstractQueueTask):
1926 def __init__(self, queue_entries):
1927 super(QueueTask, self).__init__(queue_entries)
1928 self._set_ids(queue_entries=queue_entries)
1929
1930
1931 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001932 self._check_queue_entry_statuses(
1933 self.queue_entries,
1934 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1935 models.HostQueueEntry.Status.RUNNING),
1936 allowed_host_statuses=(models.Host.Status.PENDING,
1937 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001938
1939 super(QueueTask, self).prolog()
1940
1941 for queue_entry in self.queue_entries:
1942 self._write_host_keyvals(queue_entry.host)
1943 queue_entry.host.set_status(models.Host.Status.RUNNING)
1944 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001945
1946
1947 def _finish_task(self):
1948 super(QueueTask, self)._finish_task()
1949
1950 for queue_entry in self.queue_entries:
1951 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001952 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001953
1954
mbligh4608b002010-01-05 18:22:35 +00001955class HostlessQueueTask(AbstractQueueTask):
1956 def __init__(self, queue_entry):
1957 super(HostlessQueueTask, self).__init__([queue_entry])
1958 self.queue_entry_ids = [queue_entry.id]
1959
1960
1961 def prolog(self):
1962 self.queue_entries[0].update_field('execution_subdir', 'hostless')
1963 super(HostlessQueueTask, self).prolog()
1964
1965
mbligh4608b002010-01-05 18:22:35 +00001966 def _finish_task(self):
1967 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00001968 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001969
1970
showardd3dc1992009-04-22 21:01:40 +00001971class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00001972 def __init__(self, queue_entries, log_file_name):
1973 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00001974
showardd1195652009-12-08 22:21:02 +00001975 self.queue_entries = queue_entries
1976
showardd3dc1992009-04-22 21:01:40 +00001977 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00001978 self._autoserv_monitor.attach_to_existing_process(
1979 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00001980
showardd1195652009-12-08 22:21:02 +00001981
1982 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00001983 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00001984 return 'true'
1985 return self._generate_command(
1986 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00001987
1988
1989 def _generate_command(self, results_dir):
1990 raise NotImplementedError('Subclasses must override this')
1991
1992
showardd1195652009-12-08 22:21:02 +00001993 @property
1994 def owner_username(self):
1995 return self.queue_entries[0].job.owner
1996
1997
1998 def _working_directory(self):
1999 return self._get_consistent_execution_path(self.queue_entries)
2000
2001
2002 def _paired_with_monitor(self):
2003 return self._autoserv_monitor
2004
2005
showardd3dc1992009-04-22 21:01:40 +00002006 def _job_was_aborted(self):
2007 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002008 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002009 queue_entry.update_from_database()
2010 if was_aborted is None: # first queue entry
2011 was_aborted = bool(queue_entry.aborted)
2012 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00002013 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
2014 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00002015 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00002016 'Inconsistent abort state',
2017 'Queue entries have inconsistent abort state:\n' +
2018 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00002019 # don't crash here, just assume true
2020 return True
2021 return was_aborted
2022
2023
showardd1195652009-12-08 22:21:02 +00002024 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002025 if self._job_was_aborted():
2026 return models.HostQueueEntry.Status.ABORTED
2027
2028 # we'll use a PidfileRunMonitor to read the autoserv exit status
2029 if self._autoserv_monitor.exit_code() == 0:
2030 return models.HostQueueEntry.Status.COMPLETED
2031 return models.HostQueueEntry.Status.FAILED
2032
2033
showardd3dc1992009-04-22 21:01:40 +00002034 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002035 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002036 queue_entry.set_status(status)
2037
2038
2039 def abort(self):
2040 # override AgentTask.abort() to avoid killing the process and ending
2041 # the task. post-job tasks continue when the job is aborted.
2042 pass
2043
2044
mbligh4608b002010-01-05 18:22:35 +00002045 def _pidfile_label(self):
2046 # '.autoserv_execute' -> 'autoserv'
2047 return self._pidfile_name()[1:-len('_execute')]
2048
2049
showard9bb960b2009-11-19 01:02:11 +00002050class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002051 """
2052 Task responsible for
2053 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2054 * copying logs to the results repository
2055 * spawning CleanupTasks for hosts, if necessary
2056 * spawning a FinalReparseTask for the job
2057 """
showardd1195652009-12-08 22:21:02 +00002058 def __init__(self, queue_entries, recover_run_monitor=None):
2059 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002060 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002061 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002062 self._set_ids(queue_entries=queue_entries)
2063
2064
Aviv Keshet308e7362013-05-21 14:43:16 -07002065 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd3dc1992009-04-22 21:01:40 +00002066 def _generate_command(self, results_dir):
2067 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002068 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002069 return [_autoserv_path , '-p',
2070 '--pidfile-label=%s' % self._pidfile_label(),
2071 '--use-existing-results', '--collect-crashinfo',
2072 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002073
2074
showardd1195652009-12-08 22:21:02 +00002075 @property
2076 def num_processes(self):
2077 return len(self.queue_entries)
2078
2079
2080 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002081 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002082
2083
showardd3dc1992009-04-22 21:01:40 +00002084 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002085 self._check_queue_entry_statuses(
2086 self.queue_entries,
2087 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2088 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002089
showardd3dc1992009-04-22 21:01:40 +00002090 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002091
2092
showardd3dc1992009-04-22 21:01:40 +00002093 def epilog(self):
2094 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002095 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002096 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002097
showard9bb960b2009-11-19 01:02:11 +00002098
2099 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002100 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002101 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002102 models.HostQueueEntry.Status.COMPLETED)
2103 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2104 else:
2105 final_success = False
2106 num_tests_failed = 0
2107
showard9bb960b2009-11-19 01:02:11 +00002108 reboot_after = self._job.reboot_after
2109 do_reboot = (
2110 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002111 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002112 or reboot_after == model_attributes.RebootAfter.ALWAYS
2113 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
showard9bb960b2009-11-19 01:02:11 +00002114 and final_success and num_tests_failed == 0))
2115
showardd1195652009-12-08 22:21:02 +00002116 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002117 if do_reboot:
2118 # don't pass the queue entry to the CleanupTask. if the cleanup
2119 # fails, the job doesn't care -- it's over.
2120 models.SpecialTask.objects.create(
2121 host=models.Host.objects.get(id=queue_entry.host.id),
2122 task=models.SpecialTask.Task.CLEANUP,
2123 requested_by=self._job.owner_model())
2124 else:
2125 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002126
2127
showard0bbfc212009-04-29 21:06:13 +00002128 def run(self):
showard597bfd32009-05-08 18:22:50 +00002129 autoserv_exit_code = self._autoserv_monitor.exit_code()
2130 # only run if Autoserv exited due to some signal. if we have no exit
2131 # code, assume something bad (and signal-like) happened.
2132 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002133 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002134 else:
2135 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002136
2137
mbligh4608b002010-01-05 18:22:35 +00002138class SelfThrottledPostJobTask(PostJobTask):
2139 """
2140 Special AgentTask subclass that maintains its own global process limit.
2141 """
2142 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002143
2144
mbligh4608b002010-01-05 18:22:35 +00002145 @classmethod
2146 def _increment_running_processes(cls):
2147 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002148
mblighd5c95802008-03-05 00:33:46 +00002149
mbligh4608b002010-01-05 18:22:35 +00002150 @classmethod
2151 def _decrement_running_processes(cls):
2152 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002153
2154
mbligh4608b002010-01-05 18:22:35 +00002155 @classmethod
2156 def _max_processes(cls):
2157 raise NotImplementedError
2158
2159
2160 @classmethod
2161 def _can_run_new_process(cls):
2162 return cls._num_running_processes < cls._max_processes()
2163
2164
2165 def _process_started(self):
2166 return bool(self.monitor)
2167
2168
2169 def tick(self):
2170 # override tick to keep trying to start until the process count goes
2171 # down and we can, at which point we revert to default behavior
2172 if self._process_started():
2173 super(SelfThrottledPostJobTask, self).tick()
2174 else:
2175 self._try_starting_process()
2176
2177
2178 def run(self):
2179 # override run() to not actually run unless we can
2180 self._try_starting_process()
2181
2182
2183 def _try_starting_process(self):
2184 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002185 return
2186
mbligh4608b002010-01-05 18:22:35 +00002187 # actually run the command
2188 super(SelfThrottledPostJobTask, self).run()
jamesren25663562010-04-27 18:00:55 +00002189 if self._process_started():
2190 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002191
mblighd5c95802008-03-05 00:33:46 +00002192
mbligh4608b002010-01-05 18:22:35 +00002193 def finished(self, success):
2194 super(SelfThrottledPostJobTask, self).finished(success)
2195 if self._process_started():
2196 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002197
showard21baa452008-10-21 00:08:39 +00002198
mbligh4608b002010-01-05 18:22:35 +00002199class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002200 def __init__(self, queue_entries):
2201 super(FinalReparseTask, self).__init__(queue_entries,
2202 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002203 # don't use _set_ids, since we don't want to set the host_ids
2204 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002205
2206
2207 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002208 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002209 results_dir]
2210
2211
2212 @property
2213 def num_processes(self):
2214 return 0 # don't include parser processes in accounting
2215
2216
2217 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002218 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002219
2220
showard97aed502008-11-04 02:01:24 +00002221 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002222 def _max_processes(cls):
2223 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002224
2225
2226 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002227 self._check_queue_entry_statuses(
2228 self.queue_entries,
2229 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002230
showard97aed502008-11-04 02:01:24 +00002231 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002232
2233
2234 def epilog(self):
2235 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002236 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002237
2238
mbligh4608b002010-01-05 18:22:35 +00002239class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002240 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2241
mbligh4608b002010-01-05 18:22:35 +00002242 def __init__(self, queue_entries):
2243 super(ArchiveResultsTask, self).__init__(queue_entries,
2244 log_file_name='.archiving.log')
2245 # don't use _set_ids, since we don't want to set the host_ids
2246 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002247
2248
mbligh4608b002010-01-05 18:22:35 +00002249 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002250 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002251
2252
Aviv Keshet308e7362013-05-21 14:43:16 -07002253 # TODO: Refactor into autoserv_utils. crbug.com/243090
mbligh4608b002010-01-05 18:22:35 +00002254 def _generate_command(self, results_dir):
2255 return [_autoserv_path , '-p',
2256 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002257 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002258 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2259 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002260
2261
mbligh4608b002010-01-05 18:22:35 +00002262 @classmethod
2263 def _max_processes(cls):
2264 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002265
2266
2267 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002268 self._check_queue_entry_statuses(
2269 self.queue_entries,
2270 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2271
2272 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002273
2274
mbligh4608b002010-01-05 18:22:35 +00002275 def epilog(self):
2276 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002277 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002278 failed_file = os.path.join(self._working_directory(),
2279 self._ARCHIVING_FAILED_FILE)
2280 paired_process = self._paired_with_monitor().get_process()
2281 _drone_manager.write_lines_to_file(
2282 failed_file, ['Archiving failed with exit code %s'
2283 % self.monitor.exit_code()],
2284 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002285 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002286
2287
mbligh36768f02008-02-22 18:28:33 +00002288if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002289 main()