blob: 15cda0700cfa0e6012ad141720007ae8e60e92b3 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
showard549afad2009-08-20 23:33:36 +000023BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
24PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000025
mbligh36768f02008-02-22 18:28:33 +000026RESULTS_DIR = '.'
27AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000028DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000029AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
30
31if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000032 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000033AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
34AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
35
36if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000037 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000038
mbligh90a549d2008-03-25 23:52:34 +000039# how long to wait for autoserv to write a pidfile
40PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000041
showardd3dc1992009-04-22 21:01:40 +000042_AUTOSERV_PID_FILE = '.autoserv_execute'
43_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
44_PARSER_PID_FILE = '.parser_execute'
45
46_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
47 _PARSER_PID_FILE)
48
showard35162b02009-03-03 02:17:30 +000049# error message to leave in results dir when an autoserv process disappears
50# mysteriously
51_LOST_PROCESS_ERROR = """\
52Autoserv failed abnormally during execution for this job, probably due to a
53system error on the Autotest server. Full results may not be available. Sorry.
54"""
55
mbligh6f8bab42008-02-29 22:45:14 +000056_db = None
mbligh36768f02008-02-22 18:28:33 +000057_shutdown = False
showard170873e2009-01-07 00:22:26 +000058_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
59_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000060_testing_mode = False
showard542e8402008-09-19 20:16:18 +000061_base_url = None
showardc85c21b2008-11-24 22:17:37 +000062_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000063_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000064
65
mbligh83c1e9e2009-05-01 23:10:41 +000066def _site_init_monitor_db_dummy():
67 return {}
68
69
mbligh36768f02008-02-22 18:28:33 +000070def main():
showard27f33872009-04-07 18:20:53 +000071 try:
showard549afad2009-08-20 23:33:36 +000072 try:
73 main_without_exception_handling()
74 except SystemExit:
75 raise
76 except:
77 logging.exception('Exception escaping in monitor_db')
78 raise
79 finally:
80 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000081
82
83def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000084 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000085
showard136e6dc2009-06-10 19:38:49 +000086 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000087 parser = optparse.OptionParser(usage)
88 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
89 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000090 parser.add_option('--test', help='Indicate that scheduler is under ' +
91 'test and should use dummy autoserv and no parsing',
92 action='store_true')
93 (options, args) = parser.parse_args()
94 if len(args) != 1:
95 parser.print_usage()
96 return
mbligh36768f02008-02-22 18:28:33 +000097
showard5613c662009-06-08 23:30:33 +000098 scheduler_enabled = global_config.global_config.get_config_value(
99 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
100
101 if not scheduler_enabled:
102 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
103 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000104 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000105 sys.exit(1)
106
jadmanski0afbb632008-06-06 21:10:57 +0000107 global RESULTS_DIR
108 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000109
mbligh83c1e9e2009-05-01 23:10:41 +0000110 site_init = utils.import_site_function(__file__,
111 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
112 _site_init_monitor_db_dummy)
113 site_init()
114
showardcca334f2009-03-12 20:38:34 +0000115 # Change the cwd while running to avoid issues incase we were launched from
116 # somewhere odd (such as a random NFS home directory of the person running
117 # sudo to launch us as the appropriate user).
118 os.chdir(RESULTS_DIR)
119
jadmanski0afbb632008-06-06 21:10:57 +0000120 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000121 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
122 "notify_email_statuses",
123 default='')
showardc85c21b2008-11-24 22:17:37 +0000124 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000125 _notify_email_statuses = [status for status in
126 re.split(r'[\s,;:]', notify_statuses_list.lower())
127 if status]
showardc85c21b2008-11-24 22:17:37 +0000128
jadmanski0afbb632008-06-06 21:10:57 +0000129 if options.test:
130 global _autoserv_path
131 _autoserv_path = 'autoserv_dummy'
132 global _testing_mode
133 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000134
mbligh37eceaa2008-12-15 22:56:37 +0000135 # AUTOTEST_WEB.base_url is still a supported config option as some people
136 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000137 global _base_url
showard170873e2009-01-07 00:22:26 +0000138 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
139 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000140 if config_base_url:
141 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000142 else:
mbligh37eceaa2008-12-15 22:56:37 +0000143 # For the common case of everything running on a single server you
144 # can just set the hostname in a single place in the config file.
145 server_name = c.get_config_value('SERVER', 'hostname')
146 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000147 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000148 sys.exit(1)
149 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000150
showardc5afc462009-01-13 00:09:39 +0000151 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000152 server.start()
153
jadmanski0afbb632008-06-06 21:10:57 +0000154 try:
showard136e6dc2009-06-10 19:38:49 +0000155 init()
showardc5afc462009-01-13 00:09:39 +0000156 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000157 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000158
jadmanski0afbb632008-06-06 21:10:57 +0000159 while not _shutdown:
160 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000161 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000162 except:
showard170873e2009-01-07 00:22:26 +0000163 email_manager.manager.log_stacktrace(
164 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000165
showard170873e2009-01-07 00:22:26 +0000166 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000167 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000168 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000169 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000170
171
showard136e6dc2009-06-10 19:38:49 +0000172def setup_logging():
173 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
174 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
175 logging_manager.configure_logging(
176 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
177 logfile_name=log_name)
178
179
mbligh36768f02008-02-22 18:28:33 +0000180def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000181 global _shutdown
182 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000183 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000184
185
showard136e6dc2009-06-10 19:38:49 +0000186def init():
showardb18134f2009-03-20 20:52:18 +0000187 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
188 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000189
showard8de37132009-08-31 18:33:08 +0000190 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000191 logging.critical("monitor_db already running, aborting!")
192 sys.exit(1)
193 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000194
showardb1e51872008-10-07 11:08:18 +0000195 if _testing_mode:
196 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000197 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000198
jadmanski0afbb632008-06-06 21:10:57 +0000199 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
200 global _db
showard170873e2009-01-07 00:22:26 +0000201 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000202 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000203
showardfa8629c2008-11-04 16:51:23 +0000204 # ensure Django connection is in autocommit
205 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000206 # bypass the readonly connection
207 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000208
showardb18134f2009-03-20 20:52:18 +0000209 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000210 signal.signal(signal.SIGINT, handle_sigint)
211
showardd1ee1dd2009-01-07 21:33:08 +0000212 drones = global_config.global_config.get_config_value(
213 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
214 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000215 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000216 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000217 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
218
showardb18134f2009-03-20 20:52:18 +0000219 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000220
221
showarded2afea2009-07-07 20:54:07 +0000222def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
223 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000224 """
225 @returns The autoserv command line as a list of executable + parameters.
226
227 @param machines - string - A machine or comma separated list of machines
228 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000229 @param extra_args - list - Additional arguments to pass to autoserv.
230 @param job - Job object - If supplied, -u owner and -l name parameters
231 will be added.
232 @param queue_entry - A HostQueueEntry object - If supplied and no Job
233 object was supplied, this will be used to lookup the Job object.
234 """
showard87ba02a2009-04-20 19:37:32 +0000235 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
showarded2afea2009-07-07 20:54:07 +0000236 '-r', drone_manager.WORKING_DIRECTORY]
showard87ba02a2009-04-20 19:37:32 +0000237 if job or queue_entry:
238 if not job:
239 job = queue_entry.job
240 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000241 if verbose:
242 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000243 return autoserv_argv + extra_args
244
245
showard89f84db2009-03-12 20:39:13 +0000246class SchedulerError(Exception):
247 """Raised by HostScheduler when an inconsistent state occurs."""
248
249
showard63a34772008-08-18 19:32:50 +0000250class HostScheduler(object):
251 def _get_ready_hosts(self):
252 # avoid any host with a currently active queue entry against it
253 hosts = Host.fetch(
254 joins='LEFT JOIN host_queue_entries AS active_hqe '
255 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000256 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000257 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000258 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000259 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
260 return dict((host.id, host) for host in hosts)
261
262
263 @staticmethod
264 def _get_sql_id_list(id_list):
265 return ','.join(str(item_id) for item_id in id_list)
266
267
268 @classmethod
showard989f25d2008-10-01 11:38:11 +0000269 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000270 if not id_list:
271 return {}
showard63a34772008-08-18 19:32:50 +0000272 query %= cls._get_sql_id_list(id_list)
273 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000274 return cls._process_many2many_dict(rows, flip)
275
276
277 @staticmethod
278 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000279 result = {}
280 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000281 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000282 if flip:
283 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000284 result.setdefault(left_id, set()).add(right_id)
285 return result
286
287
288 @classmethod
289 def _get_job_acl_groups(cls, job_ids):
290 query = """
showardd9ac4452009-02-07 02:04:37 +0000291 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000292 FROM jobs
293 INNER JOIN users ON users.login = jobs.owner
294 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
295 WHERE jobs.id IN (%s)
296 """
297 return cls._get_many2many_dict(query, job_ids)
298
299
300 @classmethod
301 def _get_job_ineligible_hosts(cls, job_ids):
302 query = """
303 SELECT job_id, host_id
304 FROM ineligible_host_queues
305 WHERE job_id IN (%s)
306 """
307 return cls._get_many2many_dict(query, job_ids)
308
309
310 @classmethod
showard989f25d2008-10-01 11:38:11 +0000311 def _get_job_dependencies(cls, job_ids):
312 query = """
313 SELECT job_id, label_id
314 FROM jobs_dependency_labels
315 WHERE job_id IN (%s)
316 """
317 return cls._get_many2many_dict(query, job_ids)
318
319
320 @classmethod
showard63a34772008-08-18 19:32:50 +0000321 def _get_host_acls(cls, host_ids):
322 query = """
showardd9ac4452009-02-07 02:04:37 +0000323 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000324 FROM acl_groups_hosts
325 WHERE host_id IN (%s)
326 """
327 return cls._get_many2many_dict(query, host_ids)
328
329
330 @classmethod
331 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000332 if not host_ids:
333 return {}, {}
showard63a34772008-08-18 19:32:50 +0000334 query = """
335 SELECT label_id, host_id
336 FROM hosts_labels
337 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000338 """ % cls._get_sql_id_list(host_ids)
339 rows = _db.execute(query)
340 labels_to_hosts = cls._process_many2many_dict(rows)
341 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
342 return labels_to_hosts, hosts_to_labels
343
344
345 @classmethod
346 def _get_labels(cls):
347 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000348
349
350 def refresh(self, pending_queue_entries):
351 self._hosts_available = self._get_ready_hosts()
352
353 relevant_jobs = [queue_entry.job_id
354 for queue_entry in pending_queue_entries]
355 self._job_acls = self._get_job_acl_groups(relevant_jobs)
356 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000357 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000358
359 host_ids = self._hosts_available.keys()
360 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000361 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
362
363 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000364
365
366 def _is_acl_accessible(self, host_id, queue_entry):
367 job_acls = self._job_acls.get(queue_entry.job_id, set())
368 host_acls = self._host_acls.get(host_id, set())
369 return len(host_acls.intersection(job_acls)) > 0
370
371
showard989f25d2008-10-01 11:38:11 +0000372 def _check_job_dependencies(self, job_dependencies, host_labels):
373 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000374 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000375
376
377 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
378 queue_entry):
showardade14e22009-01-26 22:38:32 +0000379 if not queue_entry.meta_host:
380 # bypass only_if_needed labels when a specific host is selected
381 return True
382
showard989f25d2008-10-01 11:38:11 +0000383 for label_id in host_labels:
384 label = self._labels[label_id]
385 if not label.only_if_needed:
386 # we don't care about non-only_if_needed labels
387 continue
388 if queue_entry.meta_host == label_id:
389 # if the label was requested in a metahost it's OK
390 continue
391 if label_id not in job_dependencies:
392 return False
393 return True
394
395
showard89f84db2009-03-12 20:39:13 +0000396 def _check_atomic_group_labels(self, host_labels, queue_entry):
397 """
398 Determine if the given HostQueueEntry's atomic group settings are okay
399 to schedule on a host with the given labels.
400
showard6157c632009-07-06 20:19:31 +0000401 @param host_labels: A list of label ids that the host has.
402 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000403
404 @returns True if atomic group settings are okay, False otherwise.
405 """
showard6157c632009-07-06 20:19:31 +0000406 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000407 queue_entry.atomic_group_id)
408
409
showard6157c632009-07-06 20:19:31 +0000410 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000411 """
412 Return the atomic group label id for a host with the given set of
413 labels if any, or None otherwise. Raises an exception if more than
414 one atomic group are found in the set of labels.
415
showard6157c632009-07-06 20:19:31 +0000416 @param host_labels: A list of label ids that the host has.
417 @param queue_entry: The HostQueueEntry we're testing. Only used for
418 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000419
420 @returns The id of the atomic group found on a label in host_labels
421 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000422 """
showard6157c632009-07-06 20:19:31 +0000423 atomic_labels = [self._labels[label_id] for label_id in host_labels
424 if self._labels[label_id].atomic_group_id is not None]
425 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000426 if not atomic_ids:
427 return None
428 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000429 logging.error('More than one Atomic Group on HQE "%s" via: %r',
430 queue_entry, atomic_labels)
431 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000432
433
434 def _get_atomic_group_labels(self, atomic_group_id):
435 """
436 Lookup the label ids that an atomic_group is associated with.
437
438 @param atomic_group_id - The id of the AtomicGroup to look up.
439
440 @returns A generator yeilding Label ids for this atomic group.
441 """
442 return (id for id, label in self._labels.iteritems()
443 if label.atomic_group_id == atomic_group_id
444 and not label.invalid)
445
446
showard54c1ea92009-05-20 00:32:58 +0000447 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000448 """
449 @param group_hosts - A sequence of Host ids to test for usability
450 and eligibility against the Job associated with queue_entry.
451 @param queue_entry - The HostQueueEntry that these hosts are being
452 tested for eligibility against.
453
454 @returns A subset of group_hosts Host ids that are eligible for the
455 supplied queue_entry.
456 """
457 return set(host_id for host_id in group_hosts
458 if self._is_host_usable(host_id)
459 and self._is_host_eligible_for_job(host_id, queue_entry))
460
461
showard989f25d2008-10-01 11:38:11 +0000462 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000463 if self._is_host_invalid(host_id):
464 # if an invalid host is scheduled for a job, it's a one-time host
465 # and it therefore bypasses eligibility checks. note this can only
466 # happen for non-metahosts, because invalid hosts have their label
467 # relationships cleared.
468 return True
469
showard989f25d2008-10-01 11:38:11 +0000470 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
471 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000472
showard89f84db2009-03-12 20:39:13 +0000473 return (self._is_acl_accessible(host_id, queue_entry) and
474 self._check_job_dependencies(job_dependencies, host_labels) and
475 self._check_only_if_needed_labels(
476 job_dependencies, host_labels, queue_entry) and
477 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000478
479
showard2924b0a2009-06-18 23:16:15 +0000480 def _is_host_invalid(self, host_id):
481 host_object = self._hosts_available.get(host_id, None)
482 return host_object and host_object.invalid
483
484
showard63a34772008-08-18 19:32:50 +0000485 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000486 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000487 return None
488 return self._hosts_available.pop(queue_entry.host_id, None)
489
490
491 def _is_host_usable(self, host_id):
492 if host_id not in self._hosts_available:
493 # host was already used during this scheduling cycle
494 return False
495 if self._hosts_available[host_id].invalid:
496 # Invalid hosts cannot be used for metahosts. They're included in
497 # the original query because they can be used by non-metahosts.
498 return False
499 return True
500
501
502 def _schedule_metahost(self, queue_entry):
503 label_id = queue_entry.meta_host
504 hosts_in_label = self._label_hosts.get(label_id, set())
505 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
506 set())
507
508 # must iterate over a copy so we can mutate the original while iterating
509 for host_id in list(hosts_in_label):
510 if not self._is_host_usable(host_id):
511 hosts_in_label.remove(host_id)
512 continue
513 if host_id in ineligible_host_ids:
514 continue
showard989f25d2008-10-01 11:38:11 +0000515 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000516 continue
517
showard89f84db2009-03-12 20:39:13 +0000518 # Remove the host from our cached internal state before returning
519 # the host object.
showard63a34772008-08-18 19:32:50 +0000520 hosts_in_label.remove(host_id)
521 return self._hosts_available.pop(host_id)
522 return None
523
524
525 def find_eligible_host(self, queue_entry):
526 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000527 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000528 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000529 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000530 return self._schedule_metahost(queue_entry)
531
532
showard89f84db2009-03-12 20:39:13 +0000533 def find_eligible_atomic_group(self, queue_entry):
534 """
535 Given an atomic group host queue entry, locate an appropriate group
536 of hosts for the associated job to run on.
537
538 The caller is responsible for creating new HQEs for the additional
539 hosts returned in order to run the actual job on them.
540
541 @returns A list of Host instances in a ready state to satisfy this
542 atomic group scheduling. Hosts will all belong to the same
543 atomic group label as specified by the queue_entry.
544 An empty list will be returned if no suitable atomic
545 group could be found.
546
547 TODO(gps): what is responsible for kicking off any attempted repairs on
548 a group of hosts? not this function, but something needs to. We do
549 not communicate that reason for returning [] outside of here...
550 For now, we'll just be unschedulable if enough hosts within one group
551 enter Repair Failed state.
552 """
553 assert queue_entry.atomic_group_id is not None
554 job = queue_entry.job
555 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000556 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000557 if job.synch_count > atomic_group.max_number_of_machines:
558 # Such a Job and HostQueueEntry should never be possible to
559 # create using the frontend. Regardless, we can't process it.
560 # Abort it immediately and log an error on the scheduler.
561 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000562 logging.error(
563 'Error: job %d synch_count=%d > requested atomic_group %d '
564 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
565 job.id, job.synch_count, atomic_group.id,
566 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000567 return []
568 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
569 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
570 set())
571
572 # Look in each label associated with atomic_group until we find one with
573 # enough hosts to satisfy the job.
574 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
575 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
576 if queue_entry.meta_host is not None:
577 # If we have a metahost label, only allow its hosts.
578 group_hosts.intersection_update(hosts_in_label)
579 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000580 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000581 group_hosts, queue_entry)
582
583 # Job.synch_count is treated as "minimum synch count" when
584 # scheduling for an atomic group of hosts. The atomic group
585 # number of machines is the maximum to pick out of a single
586 # atomic group label for scheduling at one time.
587 min_hosts = job.synch_count
588 max_hosts = atomic_group.max_number_of_machines
589
showard54c1ea92009-05-20 00:32:58 +0000590 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000591 # Not enough eligible hosts in this atomic group label.
592 continue
593
showard54c1ea92009-05-20 00:32:58 +0000594 eligible_hosts_in_group = [self._hosts_available[id]
595 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000596 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000597 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000598
showard89f84db2009-03-12 20:39:13 +0000599 # Limit ourselves to scheduling the atomic group size.
600 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000601 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000602
603 # Remove the selected hosts from our cached internal state
604 # of available hosts in order to return the Host objects.
605 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000606 for host in eligible_hosts_in_group:
607 hosts_in_label.discard(host.id)
608 self._hosts_available.pop(host.id)
609 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000610 return host_list
611
612 return []
613
614
showard170873e2009-01-07 00:22:26 +0000615class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000616 def __init__(self):
617 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000618 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000619 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000620 user_cleanup_time = scheduler_config.config.clean_interval
621 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
622 _db, user_cleanup_time)
623 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000624 self._host_agents = {}
625 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000626
mbligh36768f02008-02-22 18:28:33 +0000627
showard915958d2009-04-22 21:00:58 +0000628 def initialize(self, recover_hosts=True):
629 self._periodic_cleanup.initialize()
630 self._24hr_upkeep.initialize()
631
jadmanski0afbb632008-06-06 21:10:57 +0000632 # always recover processes
633 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000634
jadmanski0afbb632008-06-06 21:10:57 +0000635 if recover_hosts:
636 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000637
638
jadmanski0afbb632008-06-06 21:10:57 +0000639 def tick(self):
showard170873e2009-01-07 00:22:26 +0000640 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000641 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000642 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000643 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000644 self._schedule_delay_tasks()
jadmanski0afbb632008-06-06 21:10:57 +0000645 self._schedule_new_jobs()
showard8cc058f2009-09-08 16:26:33 +0000646 self._schedule_running_host_queue_entries()
647 self._schedule_special_tasks()
jadmanski0afbb632008-06-06 21:10:57 +0000648 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000649 _drone_manager.execute_actions()
650 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000651
showard97aed502008-11-04 02:01:24 +0000652
mblighf3294cc2009-04-08 21:17:38 +0000653 def _run_cleanup(self):
654 self._periodic_cleanup.run_cleanup_maybe()
655 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000656
mbligh36768f02008-02-22 18:28:33 +0000657
showard170873e2009-01-07 00:22:26 +0000658 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
659 for object_id in object_ids:
660 agent_dict.setdefault(object_id, set()).add(agent)
661
662
663 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
664 for object_id in object_ids:
665 assert object_id in agent_dict
666 agent_dict[object_id].remove(agent)
667
668
jadmanski0afbb632008-06-06 21:10:57 +0000669 def add_agent(self, agent):
670 self._agents.append(agent)
671 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000672 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
673 self._register_agent_for_ids(self._queue_entry_agents,
674 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000675
showard170873e2009-01-07 00:22:26 +0000676
677 def get_agents_for_entry(self, queue_entry):
678 """
679 Find agents corresponding to the specified queue_entry.
680 """
showardd3dc1992009-04-22 21:01:40 +0000681 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000682
683
684 def host_has_agent(self, host):
685 """
686 Determine if there is currently an Agent present using this host.
687 """
688 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000689
690
jadmanski0afbb632008-06-06 21:10:57 +0000691 def remove_agent(self, agent):
692 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000693 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
694 agent)
695 self._unregister_agent_for_ids(self._queue_entry_agents,
696 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000697
698
showard8cc058f2009-09-08 16:26:33 +0000699 def _host_has_scheduled_special_task(self, host):
700 return bool(models.SpecialTask.objects.filter(host__id=host.id,
701 is_active=False,
702 is_complete=False))
703
704
jadmanski0afbb632008-06-06 21:10:57 +0000705 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000706 self._register_pidfiles()
707 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000708 self._recover_all_recoverable_entries()
showard8cc058f2009-09-08 16:26:33 +0000709 self._recover_pending_entries()
showard6878e8b2009-07-20 22:37:45 +0000710 self._check_for_remaining_active_entries()
showard170873e2009-01-07 00:22:26 +0000711 self._reverify_remaining_hosts()
712 # reinitialize drones after killing orphaned processes, since they can
713 # leave around files when they die
714 _drone_manager.execute_actions()
715 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000716
showard170873e2009-01-07 00:22:26 +0000717
718 def _register_pidfiles(self):
719 # during recovery we may need to read pidfiles for both running and
720 # parsing entries
721 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000722 where="status IN ('Running', 'Gathering', 'Parsing')")
showarded2afea2009-07-07 20:54:07 +0000723 special_tasks = models.SpecialTask.objects.filter(is_active=True)
724 for execution_entry in itertools.chain(queue_entries, special_tasks):
showardd3dc1992009-04-22 21:01:40 +0000725 for pidfile_name in _ALL_PIDFILE_NAMES:
726 pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000727 execution_entry.execution_path(), pidfile_name=pidfile_name)
showardd3dc1992009-04-22 21:01:40 +0000728 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000729
730
showarded2afea2009-07-07 20:54:07 +0000731 def _get_recovery_run_monitor(self, execution_path, pidfile_name, orphans):
732 run_monitor = PidfileRunMonitor()
733 run_monitor.attach_to_existing_process(execution_path,
734 pidfile_name=pidfile_name)
735 if run_monitor.has_process():
736 orphans.discard(run_monitor.get_process())
737 return run_monitor, '(process %s)' % run_monitor.get_process()
738 return None, 'without process'
739
740
showard8cc058f2009-09-08 16:26:33 +0000741 def _get_unassigned_entries(self, status):
742 for entry in HostQueueEntry.fetch(where="status = '%s'" % status):
743 if not self.get_agents_for_entry(entry):
744 yield entry
745
746
showardd3dc1992009-04-22 21:01:40 +0000747 def _recover_entries_with_status(self, status, orphans, pidfile_name,
748 recover_entries_fn):
showard8cc058f2009-09-08 16:26:33 +0000749 for queue_entry in self._get_unassigned_entries(status):
showardd3dc1992009-04-22 21:01:40 +0000750 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showarded2afea2009-07-07 20:54:07 +0000751 run_monitor, process_string = self._get_recovery_run_monitor(
752 queue_entry.execution_path(), pidfile_name, orphans)
showard8cc058f2009-09-08 16:26:33 +0000753 if not run_monitor:
754 # _schedule_running_host_queue_entries should schedule and
755 # recover these entries
756 continue
showard597bfd32009-05-08 18:22:50 +0000757
showarded2afea2009-07-07 20:54:07 +0000758 logging.info('Recovering %s entry %s %s',status.lower(),
759 ', '.join(str(entry) for entry in queue_entries),
760 process_string)
showardd3dc1992009-04-22 21:01:40 +0000761 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000762
763
showard6878e8b2009-07-20 22:37:45 +0000764 def _check_for_remaining_orphan_processes(self, orphans):
765 if not orphans:
766 return
767 subject = 'Unrecovered orphan autoserv processes remain'
768 message = '\n'.join(str(process) for process in orphans)
769 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000770
771 die_on_orphans = global_config.global_config.get_config_value(
772 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
773
774 if die_on_orphans:
775 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000776
showard170873e2009-01-07 00:22:26 +0000777
showardd3dc1992009-04-22 21:01:40 +0000778 def _recover_running_entries(self, orphans):
779 def recover_entries(job, queue_entries, run_monitor):
showard8cc058f2009-09-08 16:26:33 +0000780 queue_task = QueueTask(job=job, queue_entries=queue_entries,
781 recover_run_monitor=run_monitor)
782 self.add_agent(Agent(task=queue_task,
783 num_processes=len(queue_entries)))
showardd3dc1992009-04-22 21:01:40 +0000784
785 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
showarded2afea2009-07-07 20:54:07 +0000786 orphans, _AUTOSERV_PID_FILE,
showardd3dc1992009-04-22 21:01:40 +0000787 recover_entries)
788
789
790 def _recover_gathering_entries(self, orphans):
791 def recover_entries(job, queue_entries, run_monitor):
792 gather_task = GatherLogsTask(job, queue_entries,
showarded2afea2009-07-07 20:54:07 +0000793 recover_run_monitor=run_monitor)
showard8cc058f2009-09-08 16:26:33 +0000794 self.add_agent(Agent(gather_task))
showardd3dc1992009-04-22 21:01:40 +0000795
796 self._recover_entries_with_status(
797 models.HostQueueEntry.Status.GATHERING,
798 orphans, _CRASHINFO_PID_FILE, recover_entries)
799
800
801 def _recover_parsing_entries(self, orphans):
802 def recover_entries(job, queue_entries, run_monitor):
803 reparse_task = FinalReparseTask(queue_entries,
showarded2afea2009-07-07 20:54:07 +0000804 recover_run_monitor=run_monitor)
showard8cc058f2009-09-08 16:26:33 +0000805 self.add_agent(Agent(reparse_task, num_processes=0))
showardd3dc1992009-04-22 21:01:40 +0000806
807 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
808 orphans, _PARSER_PID_FILE,
809 recover_entries)
810
811
showard8cc058f2009-09-08 16:26:33 +0000812 def _recover_pending_entries(self):
813 for entry in self._get_unassigned_entries(
814 models.HostQueueEntry.Status.PENDING):
815 entry.on_pending()
816
817
showardd3dc1992009-04-22 21:01:40 +0000818 def _recover_all_recoverable_entries(self):
819 orphans = _drone_manager.get_orphaned_autoserv_processes()
820 self._recover_running_entries(orphans)
821 self._recover_gathering_entries(orphans)
822 self._recover_parsing_entries(orphans)
showarded2afea2009-07-07 20:54:07 +0000823 self._recover_special_tasks(orphans)
showard6878e8b2009-07-20 22:37:45 +0000824 self._check_for_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000825
showard97aed502008-11-04 02:01:24 +0000826
showarded2afea2009-07-07 20:54:07 +0000827 def _recover_special_tasks(self, orphans):
showard2fe3f1d2009-07-06 20:19:11 +0000828 """\
829 Recovers all special tasks that have started running but have not
830 completed.
831 """
832
833 tasks = models.SpecialTask.objects.filter(is_active=True,
834 is_complete=False)
835 # Use ordering to force NULL queue_entry_id's to the end of the list
showarda5288b42009-07-28 20:06:08 +0000836 for task in tasks.order_by('-queue_entry__id'):
showard9b6ec502009-08-20 23:25:17 +0000837 if self.host_has_agent(task.host):
838 raise SchedulerError(
839 "%s already has a host agent %s." % (
showard8cc058f2009-09-08 16:26:33 +0000840 task, self._host_agents.get(task.host.id)))
showard2fe3f1d2009-07-06 20:19:11 +0000841
showarded2afea2009-07-07 20:54:07 +0000842 run_monitor, process_string = self._get_recovery_run_monitor(
843 task.execution_path(), _AUTOSERV_PID_FILE, orphans)
844
845 logging.info('Recovering %s %s', task, process_string)
showard8cc058f2009-09-08 16:26:33 +0000846 self._recover_special_task(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000847
848
showard8cc058f2009-09-08 16:26:33 +0000849 def _recover_special_task(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000850 """\
851 Recovers a single special task.
852 """
853 if task.task == models.SpecialTask.Task.VERIFY:
showard8cc058f2009-09-08 16:26:33 +0000854 agent_task = self._recover_verify(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000855 elif task.task == models.SpecialTask.Task.REPAIR:
showard8cc058f2009-09-08 16:26:33 +0000856 agent_task = self._recover_repair(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000857 elif task.task == models.SpecialTask.Task.CLEANUP:
showard8cc058f2009-09-08 16:26:33 +0000858 agent_task = self._recover_cleanup(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000859 else:
860 # Should never happen
861 logging.error(
862 "Special task id %d had invalid task %s", (task.id, task.task))
863
showard8cc058f2009-09-08 16:26:33 +0000864 self.add_agent(Agent(agent_task))
showard2fe3f1d2009-07-06 20:19:11 +0000865
866
showard8cc058f2009-09-08 16:26:33 +0000867 def _recover_verify(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000868 """\
869 Recovers a verify task.
870 No associated queue entry: Verify host
871 With associated queue entry: Verify host, and run associated queue
872 entry
873 """
showard8cc058f2009-09-08 16:26:33 +0000874 return VerifyTask(task=task, recover_run_monitor=run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000875
876
showard8cc058f2009-09-08 16:26:33 +0000877 def _recover_repair(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000878 """\
879 Recovers a repair task.
880 Always repair host
881 """
showard8cc058f2009-09-08 16:26:33 +0000882 return RepairTask(task=task, recover_run_monitor=run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000883
884
showard8cc058f2009-09-08 16:26:33 +0000885 def _recover_cleanup(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000886 """\
887 Recovers a cleanup task.
888 No associated queue entry: Clean host
889 With associated queue entry: Clean host, verify host if needed, and
890 run associated queue entry
891 """
showard8cc058f2009-09-08 16:26:33 +0000892 return CleanupTask(task=task, recover_run_monitor=run_monitor)
showard6af73ad2009-07-28 20:00:58 +0000893
894
showard6878e8b2009-07-20 22:37:45 +0000895 def _check_for_remaining_active_entries(self):
showard170873e2009-01-07 00:22:26 +0000896 queue_entries = HostQueueEntry.fetch(
showard8cc058f2009-09-08 16:26:33 +0000897 where='active AND NOT complete AND status NOT IN '
898 '("Starting", "Gathering", "Pending")')
showardd3dc1992009-04-22 21:01:40 +0000899
showarde8e37072009-08-20 23:31:30 +0000900 unrecovered_active_hqes = [entry for entry in queue_entries
showard8cc058f2009-09-08 16:26:33 +0000901 if not self.get_agents_for_entry(entry) and
902 not self._host_has_scheduled_special_task(
903 entry.host)]
showarde8e37072009-08-20 23:31:30 +0000904 if unrecovered_active_hqes:
905 message = '\n'.join(str(hqe) for hqe in unrecovered_active_hqes)
906 raise SchedulerError(
907 '%d unrecovered active host queue entries:\n%s' %
908 (len(unrecovered_active_hqes), message))
showard170873e2009-01-07 00:22:26 +0000909
910
showard8cc058f2009-09-08 16:26:33 +0000911 def _schedule_special_tasks(self):
912 tasks = models.SpecialTask.objects.filter(is_active=False,
913 is_complete=False,
914 host__locked=False)
915 # We want lower ids to come first, but the NULL queue_entry_ids need to
916 # come last
917 tasks = tasks.extra(select={'isnull' : 'queue_entry_id IS NULL'})
918 tasks = tasks.extra(order_by=['isnull', 'id'])
showard6d7b2ff2009-06-10 00:16:47 +0000919
showard2fe3f1d2009-07-06 20:19:11 +0000920 for task in tasks:
showard8cc058f2009-09-08 16:26:33 +0000921 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000922 continue
showard6d7b2ff2009-06-10 00:16:47 +0000923
showard8cc058f2009-09-08 16:26:33 +0000924 if task.task == models.SpecialTask.Task.CLEANUP:
925 agent_task = CleanupTask(task=task)
926 elif task.task == models.SpecialTask.Task.VERIFY:
927 agent_task = VerifyTask(task=task)
928 elif task.task == models.SpecialTask.Task.REPAIR:
929 agent_task = RepairTask(task=task)
930 else:
931 email_manager.manager.enqueue_notify_email(
932 'Special task with invalid task', task)
933 continue
934
935 self.add_agent(Agent(agent_task))
showard1ff7b2e2009-05-15 23:17:18 +0000936
937
showard170873e2009-01-07 00:22:26 +0000938 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000939 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000940 # should never happen
showarded2afea2009-07-07 20:54:07 +0000941 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000942 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000943 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000944 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000945 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000946
947
jadmanski0afbb632008-06-06 21:10:57 +0000948 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000949 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000950 full_where='locked = 0 AND invalid = 0 AND ' + where
951 for host in Host.fetch(where=full_where):
952 if self.host_has_agent(host):
953 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000954 continue
showard8cc058f2009-09-08 16:26:33 +0000955 if self._host_has_scheduled_special_task(host):
956 # host will have a special task scheduled on the next cycle
957 continue
showard170873e2009-01-07 00:22:26 +0000958 if print_message:
showardb18134f2009-03-20 20:52:18 +0000959 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000960 models.SpecialTask.objects.create(
961 task=models.SpecialTask.Task.CLEANUP,
962 host=models.Host(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000963
964
jadmanski0afbb632008-06-06 21:10:57 +0000965 def _recover_hosts(self):
966 # recover "Repair Failed" hosts
967 message = 'Reverifying dead host %s'
968 self._reverify_hosts_where("status = 'Repair Failed'",
969 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000970
971
showard04c82c52008-05-29 19:38:12 +0000972
showardb95b1bd2008-08-15 18:11:04 +0000973 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000974 # prioritize by job priority, then non-metahost over metahost, then FIFO
975 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000976 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000977 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000978 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000979
980
showard89f84db2009-03-12 20:39:13 +0000981 def _refresh_pending_queue_entries(self):
982 """
983 Lookup the pending HostQueueEntries and call our HostScheduler
984 refresh() method given that list. Return the list.
985
986 @returns A list of pending HostQueueEntries sorted in priority order.
987 """
showard63a34772008-08-18 19:32:50 +0000988 queue_entries = self._get_pending_queue_entries()
989 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000990 return []
showardb95b1bd2008-08-15 18:11:04 +0000991
showard63a34772008-08-18 19:32:50 +0000992 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000993
showard89f84db2009-03-12 20:39:13 +0000994 return queue_entries
995
996
997 def _schedule_atomic_group(self, queue_entry):
998 """
999 Schedule the given queue_entry on an atomic group of hosts.
1000
1001 Returns immediately if there are insufficient available hosts.
1002
1003 Creates new HostQueueEntries based off of queue_entry for the
1004 scheduled hosts and starts them all running.
1005 """
1006 # This is a virtual host queue entry representing an entire
1007 # atomic group, find a group and schedule their hosts.
1008 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1009 queue_entry)
1010 if not group_hosts:
1011 return
showardcbe6f942009-06-17 19:33:49 +00001012
1013 logging.info('Expanding atomic group entry %s with hosts %s',
1014 queue_entry,
1015 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +00001016 # The first assigned host uses the original HostQueueEntry
1017 group_queue_entries = [queue_entry]
1018 for assigned_host in group_hosts[1:]:
1019 # Create a new HQE for every additional assigned_host.
1020 new_hqe = HostQueueEntry.clone(queue_entry)
1021 new_hqe.save()
1022 group_queue_entries.append(new_hqe)
1023 assert len(group_queue_entries) == len(group_hosts)
1024 for queue_entry, host in itertools.izip(group_queue_entries,
1025 group_hosts):
1026 self._run_queue_entry(queue_entry, host)
1027
1028
1029 def _schedule_new_jobs(self):
1030 queue_entries = self._refresh_pending_queue_entries()
1031 if not queue_entries:
1032 return
1033
showard63a34772008-08-18 19:32:50 +00001034 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +00001035 if (queue_entry.atomic_group_id is None or
1036 queue_entry.host_id is not None):
1037 assigned_host = self._host_scheduler.find_eligible_host(
1038 queue_entry)
1039 if assigned_host:
1040 self._run_queue_entry(queue_entry, assigned_host)
1041 else:
1042 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001043
1044
showard8cc058f2009-09-08 16:26:33 +00001045 def _schedule_running_host_queue_entries(self):
1046 entries = HostQueueEntry.fetch(
1047 where="status IN "
1048 "('Starting', 'Running', 'Gathering', 'Parsing')")
1049 for entry in entries:
1050 if self.get_agents_for_entry(entry):
1051 continue
1052
1053 task_entries = entry.job.get_group_entries(entry)
1054 for task_entry in task_entries:
1055 if (task_entry.status != models.HostQueueEntry.Status.PARSING
1056 and self.host_has_agent(task_entry.host)):
1057 agent = self._host_agents.get(task_entry.host.id)[0]
1058 raise SchedulerError('Attempted to schedule on host that '
1059 'already has agent: %s (previous '
1060 'agent task: %s)'
1061 % (task_entry, agent.task))
1062
1063 if entry.status in (models.HostQueueEntry.Status.STARTING,
1064 models.HostQueueEntry.Status.RUNNING):
1065 params = entry.job.get_autoserv_params(task_entries)
1066 agent_task = QueueTask(job=entry.job,
1067 queue_entries=task_entries, cmd=params)
1068 elif entry.status == models.HostQueueEntry.Status.GATHERING:
1069 agent_task = GatherLogsTask(
1070 job=entry.job, queue_entries=task_entries)
1071 elif entry.status == models.HostQueueEntry.Status.PARSING:
1072 agent_task = FinalReparseTask(queue_entries=task_entries)
1073 else:
1074 raise SchedulerError('_schedule_running_host_queue_entries got '
1075 'entry with invalid status %s: %s'
1076 % (entry.status, entry))
1077
1078 self.add_agent(Agent(agent_task, num_processes=len(task_entries)))
1079
1080
1081 def _schedule_delay_tasks(self):
1082 for entry in HostQueueEntry.fetch(where="status = 'Waiting'"):
1083 task = entry.job.schedule_delayed_callback_task(entry)
1084 if task:
1085 self.add_agent(Agent(task, num_processes=0))
1086
1087
showardb95b1bd2008-08-15 18:11:04 +00001088 def _run_queue_entry(self, queue_entry, host):
showard8cc058f2009-09-08 16:26:33 +00001089 queue_entry.schedule_pre_job_tasks(assigned_host=host)
mblighd5c95802008-03-05 00:33:46 +00001090
1091
jadmanski0afbb632008-06-06 21:10:57 +00001092 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001093 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001094 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001095 for agent in self.get_agents_for_entry(entry):
1096 agent.abort()
1097 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001098
1099
showard324bf812009-01-20 23:23:38 +00001100 def _can_start_agent(self, agent, num_started_this_cycle,
1101 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001102 # always allow zero-process agents to run
1103 if agent.num_processes == 0:
1104 return True
1105 # don't allow any nonzero-process agents to run after we've reached a
1106 # limit (this avoids starvation of many-process agents)
1107 if have_reached_limit:
1108 return False
1109 # total process throttling
showard324bf812009-01-20 23:23:38 +00001110 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +00001111 return False
1112 # if a single agent exceeds the per-cycle throttling, still allow it to
1113 # run when it's the first agent in the cycle
1114 if num_started_this_cycle == 0:
1115 return True
1116 # per-cycle throttling
1117 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +00001118 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001119 return False
1120 return True
1121
1122
jadmanski0afbb632008-06-06 21:10:57 +00001123 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001124 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001125 have_reached_limit = False
1126 # iterate over copy, so we can remove agents during iteration
1127 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001128 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001129 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001130 have_reached_limit):
1131 have_reached_limit = True
1132 continue
showard4c5374f2008-09-04 17:02:56 +00001133 num_started_this_cycle += agent.num_processes
1134 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001135 if agent.is_done():
1136 logging.info("agent finished")
1137 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001138 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001139 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001140
1141
showard29f7cd22009-04-29 21:16:24 +00001142 def _process_recurring_runs(self):
1143 recurring_runs = models.RecurringRun.objects.filter(
1144 start_date__lte=datetime.datetime.now())
1145 for rrun in recurring_runs:
1146 # Create job from template
1147 job = rrun.job
1148 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001149 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001150
1151 host_objects = info['hosts']
1152 one_time_hosts = info['one_time_hosts']
1153 metahost_objects = info['meta_hosts']
1154 dependencies = info['dependencies']
1155 atomic_group = info['atomic_group']
1156
1157 for host in one_time_hosts or []:
1158 this_host = models.Host.create_one_time_host(host.hostname)
1159 host_objects.append(this_host)
1160
1161 try:
1162 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001163 options=options,
showard29f7cd22009-04-29 21:16:24 +00001164 host_objects=host_objects,
1165 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001166 atomic_group=atomic_group)
1167
1168 except Exception, ex:
1169 logging.exception(ex)
1170 #TODO send email
1171
1172 if rrun.loop_count == 1:
1173 rrun.delete()
1174 else:
1175 if rrun.loop_count != 0: # if not infinite loop
1176 # calculate new start_date
1177 difference = datetime.timedelta(seconds=rrun.loop_period)
1178 rrun.start_date = rrun.start_date + difference
1179 rrun.loop_count -= 1
1180 rrun.save()
1181
1182
showard170873e2009-01-07 00:22:26 +00001183class PidfileRunMonitor(object):
1184 """
1185 Client must call either run() to start a new process or
1186 attach_to_existing_process().
1187 """
mbligh36768f02008-02-22 18:28:33 +00001188
showard170873e2009-01-07 00:22:26 +00001189 class _PidfileException(Exception):
1190 """
1191 Raised when there's some unexpected behavior with the pid file, but only
1192 used internally (never allowed to escape this class).
1193 """
mbligh36768f02008-02-22 18:28:33 +00001194
1195
showard170873e2009-01-07 00:22:26 +00001196 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001197 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001198 self._start_time = None
1199 self.pidfile_id = None
1200 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001201
1202
showard170873e2009-01-07 00:22:26 +00001203 def _add_nice_command(self, command, nice_level):
1204 if not nice_level:
1205 return command
1206 return ['nice', '-n', str(nice_level)] + command
1207
1208
1209 def _set_start_time(self):
1210 self._start_time = time.time()
1211
1212
1213 def run(self, command, working_directory, nice_level=None, log_file=None,
1214 pidfile_name=None, paired_with_pidfile=None):
1215 assert command is not None
1216 if nice_level is not None:
1217 command = ['nice', '-n', str(nice_level)] + command
1218 self._set_start_time()
1219 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001220 command, working_directory, pidfile_name=pidfile_name,
1221 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001222
1223
showarded2afea2009-07-07 20:54:07 +00001224 def attach_to_existing_process(self, execution_path,
showardd3dc1992009-04-22 21:01:40 +00001225 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001226 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001227 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001228 execution_path, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001229 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001230
1231
jadmanski0afbb632008-06-06 21:10:57 +00001232 def kill(self):
showard170873e2009-01-07 00:22:26 +00001233 if self.has_process():
1234 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001235
mbligh36768f02008-02-22 18:28:33 +00001236
showard170873e2009-01-07 00:22:26 +00001237 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001238 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001239 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001240
1241
showard170873e2009-01-07 00:22:26 +00001242 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001243 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001244 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001245 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001246
1247
showard170873e2009-01-07 00:22:26 +00001248 def _read_pidfile(self, use_second_read=False):
1249 assert self.pidfile_id is not None, (
1250 'You must call run() or attach_to_existing_process()')
1251 contents = _drone_manager.get_pidfile_contents(
1252 self.pidfile_id, use_second_read=use_second_read)
1253 if contents.is_invalid():
1254 self._state = drone_manager.PidfileContents()
1255 raise self._PidfileException(contents)
1256 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001257
1258
showard21baa452008-10-21 00:08:39 +00001259 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001260 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1261 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001262 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001263 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001264
1265
1266 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001267 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001268 return
mblighbb421852008-03-11 22:36:16 +00001269
showard21baa452008-10-21 00:08:39 +00001270 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001271
showard170873e2009-01-07 00:22:26 +00001272 if self._state.process is None:
1273 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001274 return
mbligh90a549d2008-03-25 23:52:34 +00001275
showard21baa452008-10-21 00:08:39 +00001276 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001277 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001278 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001279 return
mbligh90a549d2008-03-25 23:52:34 +00001280
showard170873e2009-01-07 00:22:26 +00001281 # pid but no running process - maybe process *just* exited
1282 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001283 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001284 # autoserv exited without writing an exit code
1285 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001286 self._handle_pidfile_error(
1287 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001288
showard21baa452008-10-21 00:08:39 +00001289
1290 def _get_pidfile_info(self):
1291 """\
1292 After completion, self._state will contain:
1293 pid=None, exit_status=None if autoserv has not yet run
1294 pid!=None, exit_status=None if autoserv is running
1295 pid!=None, exit_status!=None if autoserv has completed
1296 """
1297 try:
1298 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001299 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001300 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001301
1302
showard170873e2009-01-07 00:22:26 +00001303 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001304 """\
1305 Called when no pidfile is found or no pid is in the pidfile.
1306 """
showard170873e2009-01-07 00:22:26 +00001307 message = 'No pid found at %s' % self.pidfile_id
showard170873e2009-01-07 00:22:26 +00001308 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1309 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001310 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001311 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001312
1313
showard35162b02009-03-03 02:17:30 +00001314 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001315 """\
1316 Called when autoserv has exited without writing an exit status,
1317 or we've timed out waiting for autoserv to write a pid to the
1318 pidfile. In either case, we just return failure and the caller
1319 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001320
showard170873e2009-01-07 00:22:26 +00001321 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001322 """
1323 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001324 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001325 self._state.exit_status = 1
1326 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001327
1328
jadmanski0afbb632008-06-06 21:10:57 +00001329 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001330 self._get_pidfile_info()
1331 return self._state.exit_status
1332
1333
1334 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001335 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001336 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001337 if self._state.num_tests_failed is None:
1338 return -1
showard21baa452008-10-21 00:08:39 +00001339 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001340
1341
showardcdaeae82009-08-31 18:32:48 +00001342 def try_copy_results_on_drone(self, **kwargs):
1343 if self.has_process():
1344 # copy results logs into the normal place for job results
1345 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1346
1347
1348 def try_copy_to_results_repository(self, source, **kwargs):
1349 if self.has_process():
1350 _drone_manager.copy_to_results_repository(self.get_process(),
1351 source, **kwargs)
1352
1353
mbligh36768f02008-02-22 18:28:33 +00001354class Agent(object):
showard77182562009-06-10 00:16:05 +00001355 """
showard8cc058f2009-09-08 16:26:33 +00001356 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001357
1358 The following methods are required on all task objects:
1359 poll() - Called periodically to let the task check its status and
1360 update its internal state. If the task succeeded.
1361 is_done() - Returns True if the task is finished.
1362 abort() - Called when an abort has been requested. The task must
1363 set its aborted attribute to True if it actually aborted.
1364
1365 The following attributes are required on all task objects:
1366 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001367 success - bool, True if this task succeeded.
1368 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1369 host_ids - A sequence of Host ids this task represents.
1370
1371 The following attribute is written to all task objects:
1372 agent - A reference to the Agent instance that the task has been
1373 added to.
1374 """
1375
1376
showard8cc058f2009-09-08 16:26:33 +00001377 def __init__(self, task, num_processes=1):
showard77182562009-06-10 00:16:05 +00001378 """
showard8cc058f2009-09-08 16:26:33 +00001379 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001380 @param num_processes: The number of subprocesses the Agent represents.
1381 This is used by the Dispatcher for managing the load on the
1382 system. Defaults to 1.
1383 """
showard8cc058f2009-09-08 16:26:33 +00001384 self.task = task
1385 task.agent = self
1386
showard77182562009-06-10 00:16:05 +00001387 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001388 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001389 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001390
showard8cc058f2009-09-08 16:26:33 +00001391 self.queue_entry_ids = task.queue_entry_ids
1392 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001393
showard8cc058f2009-09-08 16:26:33 +00001394 self.started = False
mbligh36768f02008-02-22 18:28:33 +00001395
1396
jadmanski0afbb632008-06-06 21:10:57 +00001397 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001398 self.started = True
1399 if self.task:
1400 self.task.poll()
1401 if self.task.is_done():
1402 self.task = None
showardec113162008-05-08 00:52:49 +00001403
1404
jadmanski0afbb632008-06-06 21:10:57 +00001405 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001406 return self.task is None
mbligh36768f02008-02-22 18:28:33 +00001407
1408
showardd3dc1992009-04-22 21:01:40 +00001409 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001410 if self.task:
1411 self.task.abort()
1412 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001413 # tasks can choose to ignore aborts
showard8cc058f2009-09-08 16:26:33 +00001414 self.task = None
showard20f9bdd2009-04-29 19:48:33 +00001415
showardd3dc1992009-04-22 21:01:40 +00001416
showard77182562009-06-10 00:16:05 +00001417class DelayedCallTask(object):
1418 """
1419 A task object like AgentTask for an Agent to run that waits for the
1420 specified amount of time to have elapsed before calling the supplied
1421 callback once and finishing. If the callback returns anything, it is
1422 assumed to be a new Agent instance and will be added to the dispatcher.
1423
1424 @attribute end_time: The absolute posix time after which this task will
1425 call its callback when it is polled and be finished.
1426
1427 Also has all attributes required by the Agent class.
1428 """
1429 def __init__(self, delay_seconds, callback, now_func=None):
1430 """
1431 @param delay_seconds: The delay in seconds from now that this task
1432 will call the supplied callback and be done.
1433 @param callback: A callable to be called by this task once after at
1434 least delay_seconds time has elapsed. It must return None
1435 or a new Agent instance.
1436 @param now_func: A time.time like function. Default: time.time.
1437 Used for testing.
1438 """
1439 assert delay_seconds > 0
1440 assert callable(callback)
1441 if not now_func:
1442 now_func = time.time
1443 self._now_func = now_func
1444 self._callback = callback
1445
1446 self.end_time = self._now_func() + delay_seconds
1447
1448 # These attributes are required by Agent.
1449 self.aborted = False
showard77182562009-06-10 00:16:05 +00001450 self.host_ids = ()
1451 self.success = False
1452 self.queue_entry_ids = ()
1453 # This is filled in by Agent.add_task().
1454 self.agent = None
1455
1456
1457 def poll(self):
showard8cc058f2009-09-08 16:26:33 +00001458 if not self.is_done() and self._now_func() >= self.end_time:
1459 self._callback()
showard77182562009-06-10 00:16:05 +00001460 self.success = True
1461
1462
1463 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001464 return self.success or self.aborted
showard77182562009-06-10 00:16:05 +00001465
1466
1467 def abort(self):
1468 self.aborted = True
showard77182562009-06-10 00:16:05 +00001469
1470
mbligh36768f02008-02-22 18:28:33 +00001471class AgentTask(object):
showard8cc058f2009-09-08 16:26:33 +00001472 def __init__(self, cmd=None, working_directory=None,
showarded2afea2009-07-07 20:54:07 +00001473 pidfile_name=None, paired_with_pidfile=None,
1474 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001475 self.done = False
jadmanski0afbb632008-06-06 21:10:57 +00001476 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001477 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001478 self.agent = None
showarded2afea2009-07-07 20:54:07 +00001479 self.monitor = recover_run_monitor
1480 self.started = bool(recover_run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +00001481 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001482 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001483 self.queue_entry_ids = []
1484 self.host_ids = []
1485 self.log_file = None
1486
1487
1488 def _set_ids(self, host=None, queue_entries=None):
1489 if queue_entries and queue_entries != [None]:
1490 self.host_ids = [entry.host.id for entry in queue_entries]
1491 self.queue_entry_ids = [entry.id for entry in queue_entries]
1492 else:
1493 assert host
1494 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001495
1496
jadmanski0afbb632008-06-06 21:10:57 +00001497 def poll(self):
showard08a36412009-05-05 01:01:13 +00001498 if not self.started:
1499 self.start()
1500 self.tick()
1501
1502
1503 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001504 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001505 exit_code = self.monitor.exit_code()
1506 if exit_code is None:
1507 return
1508 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001509 else:
1510 success = False
mbligh36768f02008-02-22 18:28:33 +00001511
jadmanski0afbb632008-06-06 21:10:57 +00001512 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001513
1514
jadmanski0afbb632008-06-06 21:10:57 +00001515 def is_done(self):
1516 return self.done
mbligh36768f02008-02-22 18:28:33 +00001517
1518
jadmanski0afbb632008-06-06 21:10:57 +00001519 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001520 if self.done:
1521 return
jadmanski0afbb632008-06-06 21:10:57 +00001522 self.done = True
1523 self.success = success
1524 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001525
1526
jadmanski0afbb632008-06-06 21:10:57 +00001527 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001528 assert not self.monitor
mblighd64e5702008-04-04 21:39:28 +00001529
mbligh36768f02008-02-22 18:28:33 +00001530
jadmanski0afbb632008-06-06 21:10:57 +00001531 def cleanup(self):
showardcdaeae82009-08-31 18:32:48 +00001532 if self.monitor and self.log_file:
1533 self.monitor.try_copy_to_results_repository(self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001534
1535
jadmanski0afbb632008-06-06 21:10:57 +00001536 def epilog(self):
1537 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001538
1539
jadmanski0afbb632008-06-06 21:10:57 +00001540 def start(self):
1541 assert self.agent
1542
1543 if not self.started:
1544 self.prolog()
1545 self.run()
1546
1547 self.started = True
1548
1549
1550 def abort(self):
1551 if self.monitor:
1552 self.monitor.kill()
1553 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001554 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001555 self.cleanup()
1556
1557
showarded2afea2009-07-07 20:54:07 +00001558 def _get_consistent_execution_path(self, execution_entries):
1559 first_execution_path = execution_entries[0].execution_path()
1560 for execution_entry in execution_entries[1:]:
1561 assert execution_entry.execution_path() == first_execution_path, (
1562 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1563 execution_entry,
1564 first_execution_path,
1565 execution_entries[0]))
1566 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001567
1568
showarded2afea2009-07-07 20:54:07 +00001569 def _copy_results(self, execution_entries, use_monitor=None):
1570 """
1571 @param execution_entries: list of objects with execution_path() method
1572 """
showard6d1c1432009-08-20 23:30:39 +00001573 if use_monitor is not None and not use_monitor.has_process():
1574 return
1575
showarded2afea2009-07-07 20:54:07 +00001576 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001577 if use_monitor is None:
1578 assert self.monitor
1579 use_monitor = self.monitor
1580 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001581 execution_path = self._get_consistent_execution_path(execution_entries)
1582 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001583 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001584
showarda1e74b32009-05-12 17:32:04 +00001585
1586 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001587 for queue_entry in queue_entries:
1588 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001589
1590
showarda1e74b32009-05-12 17:32:04 +00001591 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1592 self._copy_results(queue_entries, use_monitor)
1593 self._parse_results(queue_entries)
1594
1595
showardd3dc1992009-04-22 21:01:40 +00001596 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
showarded2afea2009-07-07 20:54:07 +00001597 assert not self.monitor
jadmanski0afbb632008-06-06 21:10:57 +00001598 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001599 self.monitor = PidfileRunMonitor()
1600 self.monitor.run(self.cmd, self._working_directory,
1601 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001602 log_file=self.log_file,
1603 pidfile_name=pidfile_name,
1604 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001605
1606
showardd9205182009-04-27 20:09:55 +00001607class TaskWithJobKeyvals(object):
1608 """AgentTask mixin providing functionality to help with job keyval files."""
1609 _KEYVAL_FILE = 'keyval'
1610 def _format_keyval(self, key, value):
1611 return '%s=%s' % (key, value)
1612
1613
1614 def _keyval_path(self):
1615 """Subclasses must override this"""
1616 raise NotImplemented
1617
1618
1619 def _write_keyval_after_job(self, field, value):
1620 assert self.monitor
1621 if not self.monitor.has_process():
1622 return
1623 _drone_manager.write_lines_to_file(
1624 self._keyval_path(), [self._format_keyval(field, value)],
1625 paired_with_process=self.monitor.get_process())
1626
1627
1628 def _job_queued_keyval(self, job):
1629 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1630
1631
1632 def _write_job_finished(self):
1633 self._write_keyval_after_job("job_finished", int(time.time()))
1634
1635
showarddb502762009-09-09 15:31:20 +00001636 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1637 keyval_contents = '\n'.join(self._format_keyval(key, value)
1638 for key, value in keyval_dict.iteritems())
1639 # always end with a newline to allow additional keyvals to be written
1640 keyval_contents += '\n'
1641 _drone_manager.attach_file_to_execution(self._working_directory,
1642 keyval_contents,
1643 file_path=keyval_path)
1644
1645
1646 def _write_keyvals_before_job(self, keyval_dict):
1647 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1648
1649
1650 def _write_host_keyvals(self, host):
1651 keyval_path = os.path.join(self._working_directory, 'host_keyvals',
1652 host.hostname)
1653 platform, all_labels = host.platform_and_labels()
1654 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1655 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1656
1657
showard8cc058f2009-09-08 16:26:33 +00001658class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001659 """
1660 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1661 """
1662
1663 TASK_TYPE = None
1664 host = None
1665 queue_entry = None
1666
1667 def __init__(self, task, extra_command_args, **kwargs):
showarded2afea2009-07-07 20:54:07 +00001668 assert (self.TASK_TYPE is not None,
1669 'self.TASK_TYPE must be overridden')
showard8cc058f2009-09-08 16:26:33 +00001670
1671 self.host = Host(id=task.host.id)
1672 self.queue_entry = None
1673 if task.queue_entry:
1674 self.queue_entry = HostQueueEntry(id=task.queue_entry.id)
1675
showarded2afea2009-07-07 20:54:07 +00001676 self.task = task
showarddb502762009-09-09 15:31:20 +00001677 kwargs['working_directory'] = task.execution_path()
showarded2afea2009-07-07 20:54:07 +00001678 self._extra_command_args = extra_command_args
1679 super(SpecialAgentTask, self).__init__(**kwargs)
1680
1681
showard8cc058f2009-09-08 16:26:33 +00001682 def _keyval_path(self):
1683 return os.path.join(self._working_directory, self._KEYVAL_FILE)
1684
1685
showarded2afea2009-07-07 20:54:07 +00001686 def prolog(self):
1687 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001688 self.cmd = _autoserv_command_line(self.host.hostname,
1689 self._extra_command_args,
1690 queue_entry=self.queue_entry)
1691 self._working_directory = self.task.execution_path()
1692 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001693 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001694
1695
showardde634ee2009-01-30 01:44:24 +00001696 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001697 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001698
showard2fe3f1d2009-07-06 20:19:11 +00001699 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001700 return # don't fail metahost entries, they'll be reassigned
1701
showard2fe3f1d2009-07-06 20:19:11 +00001702 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001703 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001704 return # entry has been aborted
1705
showard2fe3f1d2009-07-06 20:19:11 +00001706 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001707 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001708 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001709 self._write_keyval_after_job(queued_key, queued_time)
1710 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001711
showard8cc058f2009-09-08 16:26:33 +00001712 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001713 self.monitor.try_copy_results_on_drone(
1714 source_path=self._working_directory + '/',
1715 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001716
showard2fe3f1d2009-07-06 20:19:11 +00001717 self._copy_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001718 self.queue_entry.handle_host_failure()
showard2fe3f1d2009-07-06 20:19:11 +00001719 if self.queue_entry.job.parse_failed_repair:
1720 self._parse_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001721
1722 pidfile_id = _drone_manager.get_pidfile_id_from(
1723 self.queue_entry.execution_path(),
1724 pidfile_name=_AUTOSERV_PID_FILE)
1725 _drone_manager.register_pidfile(pidfile_id)
1726
1727
1728 def cleanup(self):
1729 super(SpecialAgentTask, self).cleanup()
1730 self.task.finish()
1731 if self.monitor and self.monitor.has_process():
1732 self._copy_results([self.task])
1733
1734
1735class RepairTask(SpecialAgentTask):
1736 TASK_TYPE = models.SpecialTask.Task.REPAIR
1737
1738
1739 def __init__(self, task, recover_run_monitor=None):
1740 """\
1741 queue_entry: queue entry to mark failed if this repair fails.
1742 """
1743 protection = host_protections.Protection.get_string(
1744 task.host.protection)
1745 # normalize the protection name
1746 protection = host_protections.Protection.get_attr_name(protection)
1747
1748 super(RepairTask, self).__init__(
1749 task, ['-R', '--host-protection', protection],
1750 recover_run_monitor=recover_run_monitor)
1751
1752 # *don't* include the queue entry in IDs -- if the queue entry is
1753 # aborted, we want to leave the repair task running
1754 self._set_ids(host=self.host)
1755
1756
1757 def prolog(self):
1758 super(RepairTask, self).prolog()
1759 logging.info("repair_task starting")
1760 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001761
1762
jadmanski0afbb632008-06-06 21:10:57 +00001763 def epilog(self):
1764 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001765
jadmanski0afbb632008-06-06 21:10:57 +00001766 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001767 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001768 else:
showard8cc058f2009-09-08 16:26:33 +00001769 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001770 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001771 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001772
1773
showarded2afea2009-07-07 20:54:07 +00001774class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001775 def _copy_to_results_repository(self):
1776 if not self.queue_entry or self.queue_entry.meta_host:
1777 return
1778
1779 self.queue_entry.set_execution_subdir()
1780 log_name = os.path.basename(self.task.execution_path())
1781 source = os.path.join(self.task.execution_path(), 'debug',
1782 'autoserv.DEBUG')
1783 destination = os.path.join(
1784 self.queue_entry.execution_path(), log_name)
1785
1786 self.monitor.try_copy_to_results_repository(
1787 source, destination_path=destination)
1788
1789
showard170873e2009-01-07 00:22:26 +00001790 def epilog(self):
1791 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001792
showard775300b2009-09-09 15:30:50 +00001793 if self.success:
1794 return
showard8fe93b52008-11-18 17:53:22 +00001795
showard775300b2009-09-09 15:30:50 +00001796 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001797
showard775300b2009-09-09 15:30:50 +00001798 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
1799 return
1800
1801 if self.queue_entry:
1802 self.queue_entry.requeue()
1803
1804 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001805 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001806 queue_entry__id=self.queue_entry.id):
1807 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1808 self._fail_queue_entry()
1809 return
1810
1811 queue_entry = models.HostQueueEntry(id=self.queue_entry.id)
1812 else:
1813 queue_entry = None
1814
1815 models.SpecialTask.objects.create(
1816 host=models.Host(id=self.host.id),
1817 task=models.SpecialTask.Task.REPAIR,
1818 queue_entry=queue_entry)
showard58721a82009-08-20 23:32:40 +00001819
showard8fe93b52008-11-18 17:53:22 +00001820
1821class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001822 TASK_TYPE = models.SpecialTask.Task.VERIFY
1823
1824
showard8cc058f2009-09-08 16:26:33 +00001825 def __init__(self, task, recover_run_monitor=None):
showarded2afea2009-07-07 20:54:07 +00001826 super(VerifyTask, self).__init__(
showard8cc058f2009-09-08 16:26:33 +00001827 task, ['-v'], recover_run_monitor=recover_run_monitor)
mblighe2586682008-02-29 22:45:46 +00001828
showard8cc058f2009-09-08 16:26:33 +00001829 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001830
1831
jadmanski0afbb632008-06-06 21:10:57 +00001832 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001833 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001834
showardb18134f2009-03-20 20:52:18 +00001835 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001836 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001837 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1838 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001839
showarded2afea2009-07-07 20:54:07 +00001840 # Delete any other queued verifies for this host. One verify will do
1841 # and there's no need to keep records of other requests.
1842 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001843 host__id=self.host.id,
1844 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001845 is_active=False, is_complete=False)
1846 queued_verifies = queued_verifies.exclude(id=self.task.id)
1847 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001848
mbligh36768f02008-02-22 18:28:33 +00001849
jadmanski0afbb632008-06-06 21:10:57 +00001850 def epilog(self):
1851 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001852 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001853 if self.queue_entry:
1854 self.queue_entry.on_pending()
1855 else:
1856 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001857
1858
showardb5626452009-06-30 01:57:28 +00001859class CleanupHostsMixin(object):
1860 def _reboot_hosts(self, job, queue_entries, final_success,
1861 num_tests_failed):
1862 reboot_after = job.reboot_after
1863 do_reboot = (
1864 # always reboot after aborted jobs
1865 self._final_status == models.HostQueueEntry.Status.ABORTED
1866 or reboot_after == models.RebootAfter.ALWAYS
1867 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
1868 and final_success and num_tests_failed == 0))
1869
1870 for queue_entry in queue_entries:
1871 if do_reboot:
1872 # don't pass the queue entry to the CleanupTask. if the cleanup
1873 # fails, the job doesn't care -- it's over.
showard8cc058f2009-09-08 16:26:33 +00001874 models.SpecialTask.objects.create(
1875 host=models.Host(id=queue_entry.host.id),
1876 task=models.SpecialTask.Task.CLEANUP)
showardb5626452009-06-30 01:57:28 +00001877 else:
showard8cc058f2009-09-08 16:26:33 +00001878 queue_entry.host.set_status(models.Host.Status.READY)
showardb5626452009-06-30 01:57:28 +00001879
1880
1881class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
showard8cc058f2009-09-08 16:26:33 +00001882 def __init__(self, job, queue_entries, cmd=None, recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001883 self.job = job
1884 self.queue_entries = queue_entries
showard8cc058f2009-09-08 16:26:33 +00001885 self.group_name = queue_entries[0].get_group_name()
showarded2afea2009-07-07 20:54:07 +00001886 super(QueueTask, self).__init__(
1887 cmd, self._execution_path(),
1888 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001889 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001890
1891
showard73ec0442009-02-07 02:05:20 +00001892 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001893 return os.path.join(self._execution_path(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001894
1895
showarded2afea2009-07-07 20:54:07 +00001896 def _execution_path(self):
1897 return self.queue_entries[0].execution_path()
mblighbb421852008-03-11 22:36:16 +00001898
1899
jadmanski0afbb632008-06-06 21:10:57 +00001900 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00001901 for entry in self.queue_entries:
1902 if entry.status not in (models.HostQueueEntry.Status.STARTING,
1903 models.HostQueueEntry.Status.RUNNING):
1904 raise SchedulerError('Queue task attempting to start '
1905 'entry with invalid status %s: %s'
1906 % (entry.status, entry))
1907 if entry.host.status not in (models.Host.Status.PENDING,
1908 models.Host.Status.RUNNING):
1909 raise SchedulerError('Queue task attempting to start on queue '
1910 'entry with invalid host status %s: %s'
1911 % (entry.host.status, entry))
1912
showardd9205182009-04-27 20:09:55 +00001913 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001914 keyval_dict = {queued_key: queued_time}
1915 if self.group_name:
1916 keyval_dict['host_group_name'] = self.group_name
1917 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001918 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001919 self._write_host_keyvals(queue_entry.host)
showard8cc058f2009-09-08 16:26:33 +00001920 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showard12f3e322009-05-13 21:27:42 +00001921 queue_entry.update_field('started_on', datetime.datetime.now())
showard8cc058f2009-09-08 16:26:33 +00001922 queue_entry.host.set_status(models.Host.Status.RUNNING)
showard21baa452008-10-21 00:08:39 +00001923 queue_entry.host.update_field('dirty', 1)
showardc6a56872009-07-28 20:11:58 +00001924 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1925 # TODO(gps): Remove this if nothing needs it anymore.
1926 # A potential user is: tko/parser
jadmanski0afbb632008-06-06 21:10:57 +00001927 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001928
1929
showard35162b02009-03-03 02:17:30 +00001930 def _write_lost_process_error_file(self):
showarded2afea2009-07-07 20:54:07 +00001931 error_file_path = os.path.join(self._execution_path(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001932 _drone_manager.write_lines_to_file(error_file_path,
1933 [_LOST_PROCESS_ERROR])
1934
1935
showardd3dc1992009-04-22 21:01:40 +00001936 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001937 if not self.monitor:
1938 return
1939
showardd9205182009-04-27 20:09:55 +00001940 self._write_job_finished()
1941
showard35162b02009-03-03 02:17:30 +00001942 if self.monitor.lost_process:
1943 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001944
showard8cc058f2009-09-08 16:26:33 +00001945 for queue_entry in self.queue_entries:
1946 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001947
1948
showardcbd74612008-11-19 21:42:02 +00001949 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001950 _drone_manager.write_lines_to_file(
showarded2afea2009-07-07 20:54:07 +00001951 os.path.join(self._execution_path(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001952 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001953 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001954
1955
jadmanskif7fa2cc2008-10-01 14:13:23 +00001956 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001957 if not self.monitor or not self.monitor.has_process():
1958 return
1959
jadmanskif7fa2cc2008-10-01 14:13:23 +00001960 # build up sets of all the aborted_by and aborted_on values
1961 aborted_by, aborted_on = set(), set()
1962 for queue_entry in self.queue_entries:
1963 if queue_entry.aborted_by:
1964 aborted_by.add(queue_entry.aborted_by)
1965 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1966 aborted_on.add(t)
1967
1968 # extract some actual, unique aborted by value and write it out
1969 assert len(aborted_by) <= 1
1970 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001971 aborted_by_value = aborted_by.pop()
1972 aborted_on_value = max(aborted_on)
1973 else:
1974 aborted_by_value = 'autotest_system'
1975 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001976
showarda0382352009-02-11 23:36:43 +00001977 self._write_keyval_after_job("aborted_by", aborted_by_value)
1978 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001979
showardcbd74612008-11-19 21:42:02 +00001980 aborted_on_string = str(datetime.datetime.fromtimestamp(
1981 aborted_on_value))
1982 self._write_status_comment('Job aborted by %s on %s' %
1983 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001984
1985
jadmanski0afbb632008-06-06 21:10:57 +00001986 def abort(self):
1987 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001988 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001989 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001990
1991
jadmanski0afbb632008-06-06 21:10:57 +00001992 def epilog(self):
1993 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001994 self._finish_task()
1995 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001996
1997
showardd3dc1992009-04-22 21:01:40 +00001998class PostJobTask(AgentTask):
1999 def __init__(self, queue_entries, pidfile_name, logfile_name,
showarded2afea2009-07-07 20:54:07 +00002000 recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002001 self._queue_entries = queue_entries
2002 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00002003
showarded2afea2009-07-07 20:54:07 +00002004 self._execution_path = self._get_consistent_execution_path(
2005 queue_entries)
2006 self._results_dir = _drone_manager.absolute_path(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00002007 self._autoserv_monitor = PidfileRunMonitor()
showarded2afea2009-07-07 20:54:07 +00002008 self._autoserv_monitor.attach_to_existing_process(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00002009 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
2010
2011 if _testing_mode:
2012 command = 'true'
2013 else:
2014 command = self._generate_command(self._results_dir)
2015
showarded2afea2009-07-07 20:54:07 +00002016 super(PostJobTask, self).__init__(
2017 cmd=command, working_directory=self._execution_path,
2018 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002019
showarded2afea2009-07-07 20:54:07 +00002020 self.log_file = os.path.join(self._execution_path, logfile_name)
showardd3dc1992009-04-22 21:01:40 +00002021 self._final_status = self._determine_final_status()
2022
2023
2024 def _generate_command(self, results_dir):
2025 raise NotImplementedError('Subclasses must override this')
2026
2027
2028 def _job_was_aborted(self):
2029 was_aborted = None
2030 for queue_entry in self._queue_entries:
2031 queue_entry.update_from_database()
2032 if was_aborted is None: # first queue entry
2033 was_aborted = bool(queue_entry.aborted)
2034 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2035 email_manager.manager.enqueue_notify_email(
2036 'Inconsistent abort state',
2037 'Queue entries have inconsistent abort state: ' +
2038 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2039 # don't crash here, just assume true
2040 return True
2041 return was_aborted
2042
2043
2044 def _determine_final_status(self):
2045 if self._job_was_aborted():
2046 return models.HostQueueEntry.Status.ABORTED
2047
2048 # we'll use a PidfileRunMonitor to read the autoserv exit status
2049 if self._autoserv_monitor.exit_code() == 0:
2050 return models.HostQueueEntry.Status.COMPLETED
2051 return models.HostQueueEntry.Status.FAILED
2052
2053
2054 def run(self):
showard8cc058f2009-09-08 16:26:33 +00002055 # Make sure we actually have results to work with.
2056 # This should never happen in normal operation.
showard5add1c82009-05-26 19:27:46 +00002057 if not self._autoserv_monitor.has_process():
2058 email_manager.manager.enqueue_notify_email(
showard8cc058f2009-09-08 16:26:33 +00002059 'No results in post-job task',
2060 'No results in post-job task at %s' %
2061 self._autoserv_monitor.pidfile_id)
showard5add1c82009-05-26 19:27:46 +00002062 self.finished(False)
2063 return
2064
2065 super(PostJobTask, self).run(
2066 pidfile_name=self._pidfile_name,
2067 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00002068
2069
2070 def _set_all_statuses(self, status):
2071 for queue_entry in self._queue_entries:
2072 queue_entry.set_status(status)
2073
2074
2075 def abort(self):
2076 # override AgentTask.abort() to avoid killing the process and ending
2077 # the task. post-job tasks continue when the job is aborted.
2078 pass
2079
2080
showardb5626452009-06-30 01:57:28 +00002081class GatherLogsTask(PostJobTask, CleanupHostsMixin):
showardd3dc1992009-04-22 21:01:40 +00002082 """
2083 Task responsible for
2084 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2085 * copying logs to the results repository
2086 * spawning CleanupTasks for hosts, if necessary
2087 * spawning a FinalReparseTask for the job
2088 """
showarded2afea2009-07-07 20:54:07 +00002089 def __init__(self, job, queue_entries, recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002090 self._job = job
2091 super(GatherLogsTask, self).__init__(
2092 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
showarded2afea2009-07-07 20:54:07 +00002093 logfile_name='.collect_crashinfo.log',
2094 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002095 self._set_ids(queue_entries=queue_entries)
2096
2097
2098 def _generate_command(self, results_dir):
2099 host_list = ','.join(queue_entry.host.hostname
2100 for queue_entry in self._queue_entries)
2101 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2102 '-r', results_dir]
2103
2104
2105 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00002106 for queue_entry in self._queue_entries:
2107 if queue_entry.status != models.HostQueueEntry.Status.GATHERING:
2108 raise SchedulerError('Gather task attempting to start on '
2109 'non-gathering entry: %s' % queue_entry)
2110 if queue_entry.host.status != models.Host.Status.RUNNING:
2111 raise SchedulerError('Gather task attempting to start on queue '
2112 'entry with non-running host: '
2113 '%s' % queue_entry)
2114
showardd3dc1992009-04-22 21:01:40 +00002115 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002116
2117
showardd3dc1992009-04-22 21:01:40 +00002118 def epilog(self):
2119 super(GatherLogsTask, self).epilog()
showardb5626452009-06-30 01:57:28 +00002120
showard6d1c1432009-08-20 23:30:39 +00002121 self._copy_and_parse_results(self._queue_entries,
2122 use_monitor=self._autoserv_monitor)
2123
2124 if self._autoserv_monitor.has_process():
2125 final_success = (self._final_status ==
2126 models.HostQueueEntry.Status.COMPLETED)
2127 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2128 else:
2129 final_success = False
2130 num_tests_failed = 0
2131
showardb5626452009-06-30 01:57:28 +00002132 self._reboot_hosts(self._job, self._queue_entries, final_success,
2133 num_tests_failed)
showardd3dc1992009-04-22 21:01:40 +00002134
2135
showard0bbfc212009-04-29 21:06:13 +00002136 def run(self):
showard597bfd32009-05-08 18:22:50 +00002137 autoserv_exit_code = self._autoserv_monitor.exit_code()
2138 # only run if Autoserv exited due to some signal. if we have no exit
2139 # code, assume something bad (and signal-like) happened.
2140 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002141 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002142 else:
2143 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002144
2145
showard8fe93b52008-11-18 17:53:22 +00002146class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002147 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2148
2149
showard8cc058f2009-09-08 16:26:33 +00002150 def __init__(self, task, recover_run_monitor=None):
showarded2afea2009-07-07 20:54:07 +00002151 super(CleanupTask, self).__init__(
showard8cc058f2009-09-08 16:26:33 +00002152 task, ['--cleanup'], recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002153
showard8cc058f2009-09-08 16:26:33 +00002154 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002155
mblighd5c95802008-03-05 00:33:46 +00002156
jadmanski0afbb632008-06-06 21:10:57 +00002157 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002158 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002159 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard8cc058f2009-09-08 16:26:33 +00002160 self.host.set_status(models.Host.Status.CLEANING)
2161 if self.queue_entry:
2162 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2163
2164
showard775300b2009-09-09 15:30:50 +00002165 def _finish_epilog(self):
2166 if not self.queue_entry:
2167 return
2168
2169 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
2170 self.queue_entry.on_pending()
2171 elif self.success:
2172 if self.queue_entry.job.run_verify:
2173 entry = models.HostQueueEntry(id=self.queue_entry.id)
2174 models.SpecialTask.objects.create(
2175 host=models.Host(id=self.host.id),
2176 queue_entry=entry,
2177 task=models.SpecialTask.Task.VERIFY)
2178 else:
2179 self.queue_entry.on_pending()
mblighd5c95802008-03-05 00:33:46 +00002180
mblighd5c95802008-03-05 00:33:46 +00002181
showard21baa452008-10-21 00:08:39 +00002182 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002183 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002184
showard21baa452008-10-21 00:08:39 +00002185 if self.success:
2186 self.host.update_field('dirty', 0)
showard775300b2009-09-09 15:30:50 +00002187 self.host.set_status(models.Host.Status.READY)
showard21baa452008-10-21 00:08:39 +00002188
showard775300b2009-09-09 15:30:50 +00002189 self._finish_epilog()
showard8cc058f2009-09-08 16:26:33 +00002190
showard21baa452008-10-21 00:08:39 +00002191
showardd3dc1992009-04-22 21:01:40 +00002192class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002193 _num_running_parses = 0
2194
showarded2afea2009-07-07 20:54:07 +00002195 def __init__(self, queue_entries, recover_run_monitor=None):
2196 super(FinalReparseTask, self).__init__(
2197 queue_entries, pidfile_name=_PARSER_PID_FILE,
2198 logfile_name='.parse.log',
2199 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002200 # don't use _set_ids, since we don't want to set the host_ids
2201 self.queue_entry_ids = [entry.id for entry in queue_entries]
showarded2afea2009-07-07 20:54:07 +00002202 self._parse_started = self.started
showard97aed502008-11-04 02:01:24 +00002203
showard97aed502008-11-04 02:01:24 +00002204
2205 @classmethod
2206 def _increment_running_parses(cls):
2207 cls._num_running_parses += 1
2208
2209
2210 @classmethod
2211 def _decrement_running_parses(cls):
2212 cls._num_running_parses -= 1
2213
2214
2215 @classmethod
2216 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002217 return (cls._num_running_parses <
2218 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002219
2220
2221 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00002222 for queue_entry in self._queue_entries:
2223 if queue_entry.status != models.HostQueueEntry.Status.PARSING:
2224 raise SchedulerError('Parse task attempting to start on '
2225 'non-parsing entry: %s' % queue_entry)
2226
showard97aed502008-11-04 02:01:24 +00002227 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002228
2229
2230 def epilog(self):
2231 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002232 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002233
2234
showardd3dc1992009-04-22 21:01:40 +00002235 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002236 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002237 results_dir]
showard97aed502008-11-04 02:01:24 +00002238
2239
showard08a36412009-05-05 01:01:13 +00002240 def tick(self):
2241 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002242 # and we can, at which point we revert to default behavior
2243 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002244 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002245 else:
2246 self._try_starting_parse()
2247
2248
2249 def run(self):
2250 # override run() to not actually run unless we can
2251 self._try_starting_parse()
2252
2253
2254 def _try_starting_parse(self):
2255 if not self._can_run_new_parse():
2256 return
showard170873e2009-01-07 00:22:26 +00002257
showard97aed502008-11-04 02:01:24 +00002258 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002259 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002260
showard97aed502008-11-04 02:01:24 +00002261 self._increment_running_parses()
2262 self._parse_started = True
2263
2264
2265 def finished(self, success):
2266 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002267 if self._parse_started:
2268 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002269
2270
showarda3c58572009-03-12 20:36:59 +00002271class DBError(Exception):
2272 """Raised by the DBObject constructor when its select fails."""
2273
2274
mbligh36768f02008-02-22 18:28:33 +00002275class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002276 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002277
2278 # Subclasses MUST override these:
2279 _table_name = ''
2280 _fields = ()
2281
showarda3c58572009-03-12 20:36:59 +00002282 # A mapping from (type, id) to the instance of the object for that
2283 # particular id. This prevents us from creating new Job() and Host()
2284 # instances for every HostQueueEntry object that we instantiate as
2285 # multiple HQEs often share the same Job.
2286 _instances_by_type_and_id = weakref.WeakValueDictionary()
2287 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002288
showarda3c58572009-03-12 20:36:59 +00002289
2290 def __new__(cls, id=None, **kwargs):
2291 """
2292 Look to see if we already have an instance for this particular type
2293 and id. If so, use it instead of creating a duplicate instance.
2294 """
2295 if id is not None:
2296 instance = cls._instances_by_type_and_id.get((cls, id))
2297 if instance:
2298 return instance
2299 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2300
2301
2302 def __init__(self, id=None, row=None, new_record=False, always_query=True):
showard8cc058f2009-09-08 16:26:33 +00002303 assert bool(id) or bool(row)
2304 if id is not None and row is not None:
2305 assert id == row[0]
showard6ae5ea92009-02-25 00:11:51 +00002306 assert self._table_name, '_table_name must be defined in your class'
2307 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002308 if not new_record:
2309 if self._initialized and not always_query:
2310 return # We've already been initialized.
2311 if id is None:
2312 id = row[0]
2313 # Tell future constructors to use us instead of re-querying while
2314 # this instance is still around.
2315 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002316
showard6ae5ea92009-02-25 00:11:51 +00002317 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002318
jadmanski0afbb632008-06-06 21:10:57 +00002319 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002320
jadmanski0afbb632008-06-06 21:10:57 +00002321 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002322 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002323
showarda3c58572009-03-12 20:36:59 +00002324 if self._initialized:
2325 differences = self._compare_fields_in_row(row)
2326 if differences:
showard7629f142009-03-27 21:02:02 +00002327 logging.warn(
2328 'initialized %s %s instance requery is updating: %s',
2329 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002330 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002331 self._initialized = True
2332
2333
2334 @classmethod
2335 def _clear_instance_cache(cls):
2336 """Used for testing, clear the internal instance cache."""
2337 cls._instances_by_type_and_id.clear()
2338
2339
showardccbd6c52009-03-21 00:10:21 +00002340 def _fetch_row_from_db(self, row_id):
2341 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2342 rows = _db.execute(sql, (row_id,))
2343 if not rows:
showard76e29d12009-04-15 21:53:10 +00002344 raise DBError("row not found (table=%s, row id=%s)"
2345 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002346 return rows[0]
2347
2348
showarda3c58572009-03-12 20:36:59 +00002349 def _assert_row_length(self, row):
2350 assert len(row) == len(self._fields), (
2351 "table = %s, row = %s/%d, fields = %s/%d" % (
2352 self.__table, row, len(row), self._fields, len(self._fields)))
2353
2354
2355 def _compare_fields_in_row(self, row):
2356 """
2357 Given a row as returned by a SELECT query, compare it to our existing
2358 in memory fields.
2359
2360 @param row - A sequence of values corresponding to fields named in
2361 The class attribute _fields.
2362
2363 @returns A dictionary listing the differences keyed by field name
2364 containing tuples of (current_value, row_value).
2365 """
2366 self._assert_row_length(row)
2367 differences = {}
2368 for field, row_value in itertools.izip(self._fields, row):
2369 current_value = getattr(self, field)
2370 if current_value != row_value:
2371 differences[field] = (current_value, row_value)
2372 return differences
showard2bab8f42008-11-12 18:15:22 +00002373
2374
2375 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002376 """
2377 Update our field attributes using a single row returned by SELECT.
2378
2379 @param row - A sequence of values corresponding to fields named in
2380 the class fields list.
2381 """
2382 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002383
showard2bab8f42008-11-12 18:15:22 +00002384 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002385 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002386 setattr(self, field, value)
2387 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002388
showard2bab8f42008-11-12 18:15:22 +00002389 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002390
mblighe2586682008-02-29 22:45:46 +00002391
showardccbd6c52009-03-21 00:10:21 +00002392 def update_from_database(self):
2393 assert self.id is not None
2394 row = self._fetch_row_from_db(self.id)
2395 self._update_fields_from_row(row)
2396
2397
jadmanski0afbb632008-06-06 21:10:57 +00002398 def count(self, where, table = None):
2399 if not table:
2400 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002401
jadmanski0afbb632008-06-06 21:10:57 +00002402 rows = _db.execute("""
2403 SELECT count(*) FROM %s
2404 WHERE %s
2405 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002406
jadmanski0afbb632008-06-06 21:10:57 +00002407 assert len(rows) == 1
2408
2409 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002410
2411
showardd3dc1992009-04-22 21:01:40 +00002412 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002413 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002414
showard2bab8f42008-11-12 18:15:22 +00002415 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002416 return
mbligh36768f02008-02-22 18:28:33 +00002417
mblighf8c624d2008-07-03 16:58:45 +00002418 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002419 _db.execute(query, (value, self.id))
2420
showard2bab8f42008-11-12 18:15:22 +00002421 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002422
2423
jadmanski0afbb632008-06-06 21:10:57 +00002424 def save(self):
2425 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002426 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002427 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002428 values = []
2429 for key in keys:
2430 value = getattr(self, key)
2431 if value is None:
2432 values.append('NULL')
2433 else:
2434 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002435 values_str = ','.join(values)
2436 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2437 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002438 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002439 # Update our id to the one the database just assigned to us.
2440 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002441
2442
jadmanski0afbb632008-06-06 21:10:57 +00002443 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002444 self._instances_by_type_and_id.pop((type(self), id), None)
2445 self._initialized = False
2446 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002447 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2448 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002449
2450
showard63a34772008-08-18 19:32:50 +00002451 @staticmethod
2452 def _prefix_with(string, prefix):
2453 if string:
2454 string = prefix + string
2455 return string
2456
2457
jadmanski0afbb632008-06-06 21:10:57 +00002458 @classmethod
showard989f25d2008-10-01 11:38:11 +00002459 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002460 """
2461 Construct instances of our class based on the given database query.
2462
2463 @yields One class instance for each row fetched.
2464 """
showard63a34772008-08-18 19:32:50 +00002465 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2466 where = cls._prefix_with(where, 'WHERE ')
2467 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002468 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002469 'joins' : joins,
2470 'where' : where,
2471 'order_by' : order_by})
2472 rows = _db.execute(query, params)
showard8cc058f2009-09-08 16:26:33 +00002473 return [cls(id=row[0], row=row) for row in rows]
mblighe2586682008-02-29 22:45:46 +00002474
mbligh36768f02008-02-22 18:28:33 +00002475
2476class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002477 _table_name = 'ineligible_host_queues'
2478 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002479
2480
showard89f84db2009-03-12 20:39:13 +00002481class AtomicGroup(DBObject):
2482 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002483 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2484 'invalid')
showard89f84db2009-03-12 20:39:13 +00002485
2486
showard989f25d2008-10-01 11:38:11 +00002487class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002488 _table_name = 'labels'
2489 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002490 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002491
2492
showard6157c632009-07-06 20:19:31 +00002493 def __repr__(self):
2494 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2495 self.name, self.id, self.atomic_group_id)
2496
2497
mbligh36768f02008-02-22 18:28:33 +00002498class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002499 _table_name = 'hosts'
2500 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2501 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2502
2503
jadmanski0afbb632008-06-06 21:10:57 +00002504 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002505 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002506 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002507
2508
showard170873e2009-01-07 00:22:26 +00002509 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002510 """
showard170873e2009-01-07 00:22:26 +00002511 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002512 """
2513 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002514 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002515 FROM labels
2516 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002517 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002518 ORDER BY labels.name
2519 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002520 platform = None
2521 all_labels = []
2522 for label_name, is_platform in rows:
2523 if is_platform:
2524 platform = label_name
2525 all_labels.append(label_name)
2526 return platform, all_labels
2527
2528
showard54c1ea92009-05-20 00:32:58 +00002529 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2530
2531
2532 @classmethod
2533 def cmp_for_sort(cls, a, b):
2534 """
2535 A comparison function for sorting Host objects by hostname.
2536
2537 This strips any trailing numeric digits, ignores leading 0s and
2538 compares hostnames by the leading name and the trailing digits as a
2539 number. If both hostnames do not match this pattern, they are simply
2540 compared as lower case strings.
2541
2542 Example of how hostnames will be sorted:
2543
2544 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2545
2546 This hopefully satisfy most people's hostname sorting needs regardless
2547 of their exact naming schemes. Nobody sane should have both a host10
2548 and host010 (but the algorithm works regardless).
2549 """
2550 lower_a = a.hostname.lower()
2551 lower_b = b.hostname.lower()
2552 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2553 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2554 if match_a and match_b:
2555 name_a, number_a_str = match_a.groups()
2556 name_b, number_b_str = match_b.groups()
2557 number_a = int(number_a_str.lstrip('0'))
2558 number_b = int(number_b_str.lstrip('0'))
2559 result = cmp((name_a, number_a), (name_b, number_b))
2560 if result == 0 and lower_a != lower_b:
2561 # If they compared equal above but the lower case names are
2562 # indeed different, don't report equality. abc012 != abc12.
2563 return cmp(lower_a, lower_b)
2564 return result
2565 else:
2566 return cmp(lower_a, lower_b)
2567
2568
mbligh36768f02008-02-22 18:28:33 +00002569class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002570 _table_name = 'host_queue_entries'
2571 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002572 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002573 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002574
2575
showarda3c58572009-03-12 20:36:59 +00002576 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002577 assert id or row
showarda3c58572009-03-12 20:36:59 +00002578 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002579 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002580
jadmanski0afbb632008-06-06 21:10:57 +00002581 if self.host_id:
2582 self.host = Host(self.host_id)
2583 else:
2584 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002585
showard77182562009-06-10 00:16:05 +00002586 if self.atomic_group_id:
2587 self.atomic_group = AtomicGroup(self.atomic_group_id,
2588 always_query=False)
2589 else:
2590 self.atomic_group = None
2591
showard170873e2009-01-07 00:22:26 +00002592 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002593 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002594
2595
showard89f84db2009-03-12 20:39:13 +00002596 @classmethod
2597 def clone(cls, template):
2598 """
2599 Creates a new row using the values from a template instance.
2600
2601 The new instance will not exist in the database or have a valid
2602 id attribute until its save() method is called.
2603 """
2604 assert isinstance(template, cls)
2605 new_row = [getattr(template, field) for field in cls._fields]
2606 clone = cls(row=new_row, new_record=True)
2607 clone.id = None
2608 return clone
2609
2610
showardc85c21b2008-11-24 22:17:37 +00002611 def _view_job_url(self):
2612 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2613
2614
showardf1ae3542009-05-11 19:26:02 +00002615 def get_labels(self):
2616 """
2617 Get all labels associated with this host queue entry (either via the
2618 meta_host or as a job dependency label). The labels yielded are not
2619 guaranteed to be unique.
2620
2621 @yields Label instances associated with this host_queue_entry.
2622 """
2623 if self.meta_host:
2624 yield Label(id=self.meta_host, always_query=False)
2625 labels = Label.fetch(
2626 joins="JOIN jobs_dependency_labels AS deps "
2627 "ON (labels.id = deps.label_id)",
2628 where="deps.job_id = %d" % self.job.id)
2629 for label in labels:
2630 yield label
2631
2632
jadmanski0afbb632008-06-06 21:10:57 +00002633 def set_host(self, host):
2634 if host:
2635 self.queue_log_record('Assigning host ' + host.hostname)
2636 self.update_field('host_id', host.id)
2637 self.update_field('active', True)
2638 self.block_host(host.id)
2639 else:
2640 self.queue_log_record('Releasing host')
2641 self.unblock_host(self.host.id)
2642 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002643
jadmanski0afbb632008-06-06 21:10:57 +00002644 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002645
2646
jadmanski0afbb632008-06-06 21:10:57 +00002647 def get_host(self):
2648 return self.host
mbligh36768f02008-02-22 18:28:33 +00002649
2650
jadmanski0afbb632008-06-06 21:10:57 +00002651 def queue_log_record(self, log_line):
2652 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002653 _drone_manager.write_lines_to_file(self.queue_log_path,
2654 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002655
2656
jadmanski0afbb632008-06-06 21:10:57 +00002657 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002658 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002659 row = [0, self.job.id, host_id]
2660 block = IneligibleHostQueue(row=row, new_record=True)
2661 block.save()
mblighe2586682008-02-29 22:45:46 +00002662
2663
jadmanski0afbb632008-06-06 21:10:57 +00002664 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002665 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002666 blocks = IneligibleHostQueue.fetch(
2667 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2668 for block in blocks:
2669 block.delete()
mblighe2586682008-02-29 22:45:46 +00002670
2671
showard2bab8f42008-11-12 18:15:22 +00002672 def set_execution_subdir(self, subdir=None):
2673 if subdir is None:
2674 assert self.get_host()
2675 subdir = self.get_host().hostname
2676 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002677
2678
showard6355f6b2008-12-05 18:52:13 +00002679 def _get_hostname(self):
2680 if self.host:
2681 return self.host.hostname
2682 return 'no host'
2683
2684
showard170873e2009-01-07 00:22:26 +00002685 def __str__(self):
2686 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2687
2688
jadmanski0afbb632008-06-06 21:10:57 +00002689 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002690 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002691
showardb18134f2009-03-20 20:52:18 +00002692 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002693
showard8cc058f2009-09-08 16:26:33 +00002694 if status in (models.HostQueueEntry.Status.QUEUED,
2695 models.HostQueueEntry.Status.PARSING):
jadmanski0afbb632008-06-06 21:10:57 +00002696 self.update_field('complete', False)
2697 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002698
showard8cc058f2009-09-08 16:26:33 +00002699 if status in (models.HostQueueEntry.Status.PENDING,
2700 models.HostQueueEntry.Status.RUNNING,
2701 models.HostQueueEntry.Status.VERIFYING,
2702 models.HostQueueEntry.Status.STARTING,
2703 models.HostQueueEntry.Status.GATHERING):
jadmanski0afbb632008-06-06 21:10:57 +00002704 self.update_field('complete', False)
2705 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002706
showard8cc058f2009-09-08 16:26:33 +00002707 if status in (models.HostQueueEntry.Status.FAILED,
2708 models.HostQueueEntry.Status.COMPLETED,
2709 models.HostQueueEntry.Status.STOPPED,
2710 models.HostQueueEntry.Status.ABORTED):
jadmanski0afbb632008-06-06 21:10:57 +00002711 self.update_field('complete', True)
2712 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002713
2714 should_email_status = (status.lower() in _notify_email_statuses or
2715 'all' in _notify_email_statuses)
2716 if should_email_status:
2717 self._email_on_status(status)
2718
2719 self._email_on_job_complete()
2720
2721
2722 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002723 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002724
2725 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2726 self.job.id, self.job.name, hostname, status)
2727 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2728 self.job.id, self.job.name, hostname, status,
2729 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002730 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002731
2732
2733 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002734 if not self.job.is_finished():
2735 return
showard542e8402008-09-19 20:16:18 +00002736
showardc85c21b2008-11-24 22:17:37 +00002737 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002738 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002739 for queue_entry in hosts_queue:
2740 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002741 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002742 queue_entry.status))
2743
2744 summary_text = "\n".join(summary_text)
2745 status_counts = models.Job.objects.get_status_counts(
2746 [self.job.id])[self.job.id]
2747 status = ', '.join('%d %s' % (count, status) for status, count
2748 in status_counts.iteritems())
2749
2750 subject = 'Autotest: Job ID: %s "%s" %s' % (
2751 self.job.id, self.job.name, status)
2752 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2753 self.job.id, self.job.name, status, self._view_job_url(),
2754 summary_text)
showard170873e2009-01-07 00:22:26 +00002755 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002756
2757
showard8cc058f2009-09-08 16:26:33 +00002758 def schedule_pre_job_tasks(self, assigned_host=None):
showard77182562009-06-10 00:16:05 +00002759 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002760 assert assigned_host
2761 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002762 if self.host_id is None:
2763 self.set_host(assigned_host)
2764 else:
2765 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002766
showardcfd4a7e2009-07-11 01:47:33 +00002767 logging.info("%s/%s/%s scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002768 self.job.name, self.meta_host, self.atomic_group_id,
2769 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002770
showard8cc058f2009-09-08 16:26:33 +00002771 self._do_schedule_pre_job_tasks()
showard77182562009-06-10 00:16:05 +00002772
2773
showard8cc058f2009-09-08 16:26:33 +00002774 def _do_schedule_pre_job_tasks(self):
showard77182562009-06-10 00:16:05 +00002775 # Every host goes thru the Verifying stage (which may or may not
2776 # actually do anything as determined by get_pre_job_tasks).
2777 self.set_status(models.HostQueueEntry.Status.VERIFYING)
showard8cc058f2009-09-08 16:26:33 +00002778 self.job.schedule_pre_job_tasks(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002779
showard6ae5ea92009-02-25 00:11:51 +00002780
jadmanski0afbb632008-06-06 21:10:57 +00002781 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002782 assert self.host
showard8cc058f2009-09-08 16:26:33 +00002783 self.set_status(models.HostQueueEntry.Status.QUEUED)
showard12f3e322009-05-13 21:27:42 +00002784 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002785 # verify/cleanup failure sets the execution subdir, so reset it here
2786 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002787 if self.meta_host:
2788 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002789
2790
jadmanski0afbb632008-06-06 21:10:57 +00002791 def handle_host_failure(self):
2792 """\
2793 Called when this queue entry's host has failed verification and
2794 repair.
2795 """
2796 assert not self.meta_host
showard8cc058f2009-09-08 16:26:33 +00002797 self.set_status(models.HostQueueEntry.Status.FAILED)
showard2bab8f42008-11-12 18:15:22 +00002798 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002799
2800
jadmanskif7fa2cc2008-10-01 14:13:23 +00002801 @property
2802 def aborted_by(self):
2803 self._load_abort_info()
2804 return self._aborted_by
2805
2806
2807 @property
2808 def aborted_on(self):
2809 self._load_abort_info()
2810 return self._aborted_on
2811
2812
2813 def _load_abort_info(self):
2814 """ Fetch info about who aborted the job. """
2815 if hasattr(self, "_aborted_by"):
2816 return
2817 rows = _db.execute("""
2818 SELECT users.login, aborted_host_queue_entries.aborted_on
2819 FROM aborted_host_queue_entries
2820 INNER JOIN users
2821 ON users.id = aborted_host_queue_entries.aborted_by_id
2822 WHERE aborted_host_queue_entries.queue_entry_id = %s
2823 """, (self.id,))
2824 if rows:
2825 self._aborted_by, self._aborted_on = rows[0]
2826 else:
2827 self._aborted_by = self._aborted_on = None
2828
2829
showardb2e2c322008-10-14 17:33:55 +00002830 def on_pending(self):
2831 """
2832 Called when an entry in a synchronous job has passed verify. If the
showard8cc058f2009-09-08 16:26:33 +00002833 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
2834 them in PENDING.
showardb2e2c322008-10-14 17:33:55 +00002835 """
showard8cc058f2009-09-08 16:26:33 +00002836 self.set_status(models.HostQueueEntry.Status.PENDING)
2837 self.host.set_status(models.Host.Status.PENDING)
showardb000a8d2009-07-28 20:02:07 +00002838
2839 # Some debug code here: sends an email if an asynchronous job does not
2840 # immediately enter Starting.
2841 # TODO: Remove this once we figure out why asynchronous jobs are getting
2842 # stuck in Pending.
showard8cc058f2009-09-08 16:26:33 +00002843 self.job.run_if_ready(queue_entry=self)
2844 if (self.job.synch_count == 1 and
2845 self.status == models.HostQueueEntry.Status.PENDING):
showardb000a8d2009-07-28 20:02:07 +00002846 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
2847 message = 'Asynchronous job stuck in Pending'
2848 email_manager.manager.enqueue_notify_email(subject, message)
showardb2e2c322008-10-14 17:33:55 +00002849
2850
showardd3dc1992009-04-22 21:01:40 +00002851 def abort(self, dispatcher):
2852 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002853
showardd3dc1992009-04-22 21:01:40 +00002854 Status = models.HostQueueEntry.Status
showard8cc058f2009-09-08 16:26:33 +00002855 if self.status in (Status.GATHERING, Status.PARSING):
showardd3dc1992009-04-22 21:01:40 +00002856 # do nothing; post-job tasks will finish and then mark this entry
2857 # with status "Aborted" and take care of the host
2858 return
2859
showard8cc058f2009-09-08 16:26:33 +00002860 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING):
2861 assert not dispatcher.get_agents_for_entry(self)
showardd3dc1992009-04-22 21:01:40 +00002862 self.host.set_status(models.Host.Status.READY)
2863 elif self.status == Status.VERIFYING:
showard8cc058f2009-09-08 16:26:33 +00002864 models.SpecialTask.objects.create(
2865 task=models.SpecialTask.Task.CLEANUP,
2866 host=models.Host(id=self.host.id))
showardd3dc1992009-04-22 21:01:40 +00002867
2868 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002869
showard8cc058f2009-09-08 16:26:33 +00002870
2871 def get_group_name(self):
2872 atomic_group = self.atomic_group
2873 if not atomic_group:
2874 return ''
2875
2876 # Look at any meta_host and dependency labels and pick the first
2877 # one that also specifies this atomic group. Use that label name
2878 # as the group name if possible (it is more specific).
2879 for label in self.get_labels():
2880 if label.atomic_group_id:
2881 assert label.atomic_group_id == atomic_group.id
2882 return label.name
2883 return atomic_group.name
2884
2885
showard170873e2009-01-07 00:22:26 +00002886 def execution_tag(self):
2887 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00002888 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002889
2890
showarded2afea2009-07-07 20:54:07 +00002891 def execution_path(self):
2892 return self.execution_tag()
2893
2894
mbligh36768f02008-02-22 18:28:33 +00002895class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002896 _table_name = 'jobs'
2897 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2898 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002899 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002900 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002901
showard77182562009-06-10 00:16:05 +00002902 # This does not need to be a column in the DB. The delays are likely to
2903 # be configured short. If the scheduler is stopped and restarted in
2904 # the middle of a job's delay cycle, the delay cycle will either be
2905 # repeated or skipped depending on the number of Pending machines found
2906 # when the restarted scheduler recovers to track it. Not a problem.
2907 #
2908 # A reference to the DelayedCallTask that will wake up the job should
2909 # no other HQEs change state in time. Its end_time attribute is used
2910 # by our run_with_ready_delay() method to determine if the wait is over.
2911 _delay_ready_task = None
2912
2913 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2914 # all status='Pending' atomic group HQEs incase a delay was running when the
2915 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002916
showarda3c58572009-03-12 20:36:59 +00002917 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002918 assert id or row
showarda3c58572009-03-12 20:36:59 +00002919 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002920
mblighe2586682008-02-29 22:45:46 +00002921
jadmanski0afbb632008-06-06 21:10:57 +00002922 def is_server_job(self):
2923 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002924
2925
showard170873e2009-01-07 00:22:26 +00002926 def tag(self):
2927 return "%s-%s" % (self.id, self.owner)
2928
2929
jadmanski0afbb632008-06-06 21:10:57 +00002930 def get_host_queue_entries(self):
2931 rows = _db.execute("""
2932 SELECT * FROM host_queue_entries
2933 WHERE job_id= %s
2934 """, (self.id,))
2935 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002936
jadmanski0afbb632008-06-06 21:10:57 +00002937 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002938
jadmanski0afbb632008-06-06 21:10:57 +00002939 return entries
mbligh36768f02008-02-22 18:28:33 +00002940
2941
jadmanski0afbb632008-06-06 21:10:57 +00002942 def set_status(self, status, update_queues=False):
2943 self.update_field('status',status)
2944
2945 if update_queues:
2946 for queue_entry in self.get_host_queue_entries():
2947 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002948
2949
showard77182562009-06-10 00:16:05 +00002950 def _atomic_and_has_started(self):
2951 """
2952 @returns True if any of the HostQueueEntries associated with this job
2953 have entered the Status.STARTING state or beyond.
2954 """
2955 atomic_entries = models.HostQueueEntry.objects.filter(
2956 job=self.id, atomic_group__isnull=False)
2957 if atomic_entries.count() <= 0:
2958 return False
2959
showardaf8b4ca2009-06-16 18:47:26 +00002960 # These states may *only* be reached if Job.run() has been called.
2961 started_statuses = (models.HostQueueEntry.Status.STARTING,
2962 models.HostQueueEntry.Status.RUNNING,
2963 models.HostQueueEntry.Status.COMPLETED)
2964
2965 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00002966 return started_entries.count() > 0
2967
2968
showard708b3522009-08-20 23:26:15 +00002969 def _hosts_assigned_count(self):
2970 """The number of HostQueueEntries assigned a Host for this job."""
2971 entries = models.HostQueueEntry.objects.filter(job=self.id,
2972 host__isnull=False)
2973 return entries.count()
2974
2975
showard77182562009-06-10 00:16:05 +00002976 def _pending_count(self):
2977 """The number of HostQueueEntries for this job in the Pending state."""
2978 pending_entries = models.HostQueueEntry.objects.filter(
2979 job=self.id, status=models.HostQueueEntry.Status.PENDING)
2980 return pending_entries.count()
2981
2982
jadmanski0afbb632008-06-06 21:10:57 +00002983 def is_ready(self):
showard77182562009-06-10 00:16:05 +00002984 # NOTE: Atomic group jobs stop reporting ready after they have been
2985 # started to avoid launching multiple copies of one atomic job.
2986 # Only possible if synch_count is less than than half the number of
2987 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00002988 pending_count = self._pending_count()
2989 atomic_and_has_started = self._atomic_and_has_started()
2990 ready = (pending_count >= self.synch_count
2991 and not self._atomic_and_has_started())
2992
2993 if not ready:
2994 logging.info(
2995 'Job %s not ready: %s pending, %s required '
2996 '(Atomic and started: %s)',
2997 self, pending_count, self.synch_count,
2998 atomic_and_has_started)
2999
3000 return ready
mbligh36768f02008-02-22 18:28:33 +00003001
3002
jadmanski0afbb632008-06-06 21:10:57 +00003003 def num_machines(self, clause = None):
3004 sql = "job_id=%s" % self.id
3005 if clause:
3006 sql += " AND (%s)" % clause
3007 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00003008
3009
jadmanski0afbb632008-06-06 21:10:57 +00003010 def num_queued(self):
3011 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00003012
3013
jadmanski0afbb632008-06-06 21:10:57 +00003014 def num_active(self):
3015 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00003016
3017
jadmanski0afbb632008-06-06 21:10:57 +00003018 def num_complete(self):
3019 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00003020
3021
jadmanski0afbb632008-06-06 21:10:57 +00003022 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00003023 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00003024
mbligh36768f02008-02-22 18:28:33 +00003025
showard6bb7c292009-01-30 01:44:51 +00003026 def _not_yet_run_entries(self, include_verifying=True):
3027 statuses = [models.HostQueueEntry.Status.QUEUED,
3028 models.HostQueueEntry.Status.PENDING]
3029 if include_verifying:
3030 statuses.append(models.HostQueueEntry.Status.VERIFYING)
3031 return models.HostQueueEntry.objects.filter(job=self.id,
3032 status__in=statuses)
3033
3034
3035 def _stop_all_entries(self):
3036 entries_to_stop = self._not_yet_run_entries(
3037 include_verifying=False)
3038 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00003039 assert not child_entry.complete, (
3040 '%s status=%s, active=%s, complete=%s' %
3041 (child_entry.id, child_entry.status, child_entry.active,
3042 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00003043 if child_entry.status == models.HostQueueEntry.Status.PENDING:
3044 child_entry.host.status = models.Host.Status.READY
3045 child_entry.host.save()
3046 child_entry.status = models.HostQueueEntry.Status.STOPPED
3047 child_entry.save()
3048
showard2bab8f42008-11-12 18:15:22 +00003049 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00003050 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00003051 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00003052 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00003053
3054
jadmanski0afbb632008-06-06 21:10:57 +00003055 def write_to_machines_file(self, queue_entry):
3056 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00003057 file_path = os.path.join(self.tag(), '.machines')
3058 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00003059
3060
showardf1ae3542009-05-11 19:26:02 +00003061 def _next_group_name(self, group_name=''):
3062 """@returns a directory name to use for the next host group results."""
3063 if group_name:
3064 # Sanitize for use as a pathname.
3065 group_name = group_name.replace(os.path.sep, '_')
3066 if group_name.startswith('.'):
3067 group_name = '_' + group_name[1:]
3068 # Add a separator between the group name and 'group%d'.
3069 group_name += '.'
3070 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00003071 query = models.HostQueueEntry.objects.filter(
3072 job=self.id).values('execution_subdir').distinct()
3073 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003074 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3075 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003076 if ids:
3077 next_id = max(ids) + 1
3078 else:
3079 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003080 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003081
3082
showarddb502762009-09-09 15:31:20 +00003083 def _write_control_file(self, execution_path):
showard170873e2009-01-07 00:22:26 +00003084 control_path = _drone_manager.attach_file_to_execution(
showarddb502762009-09-09 15:31:20 +00003085 execution_path, self.control_file)
showard170873e2009-01-07 00:22:26 +00003086 return control_path
mbligh36768f02008-02-22 18:28:33 +00003087
showardb2e2c322008-10-14 17:33:55 +00003088
showard2bab8f42008-11-12 18:15:22 +00003089 def get_group_entries(self, queue_entry_from_group):
3090 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003091 return list(HostQueueEntry.fetch(
3092 where='job_id=%s AND execution_subdir=%s',
3093 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003094
3095
showard8cc058f2009-09-08 16:26:33 +00003096 def get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003097 assert queue_entries
showarddb502762009-09-09 15:31:20 +00003098 execution_path = queue_entries[0].execution_path()
3099 control_path = self._write_control_file(execution_path)
jadmanski0afbb632008-06-06 21:10:57 +00003100 hostnames = ','.join([entry.get_host().hostname
3101 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00003102
showarddb502762009-09-09 15:31:20 +00003103 execution_tag = queue_entries[0].execution_tag()
showard87ba02a2009-04-20 19:37:32 +00003104 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003105 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003106 ['-P', execution_tag, '-n',
3107 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003108 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003109
jadmanski0afbb632008-06-06 21:10:57 +00003110 if not self.is_server_job():
3111 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003112
showardb2e2c322008-10-14 17:33:55 +00003113 return params
mblighe2586682008-02-29 22:45:46 +00003114
mbligh36768f02008-02-22 18:28:33 +00003115
showardc9ae1782009-01-30 01:42:37 +00003116 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003117 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003118 return True
showard0fc38302008-10-23 00:44:07 +00003119 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00003120 return queue_entry.get_host().dirty
3121 return False
showard21baa452008-10-21 00:08:39 +00003122
showardc9ae1782009-01-30 01:42:37 +00003123
showard8cc058f2009-09-08 16:26:33 +00003124 def _should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003125 do_not_verify = (queue_entry.host.protection ==
3126 host_protections.Protection.DO_NOT_VERIFY)
3127 if do_not_verify:
3128 return False
3129 return self.run_verify
3130
3131
showard8cc058f2009-09-08 16:26:33 +00003132 def schedule_pre_job_tasks(self, queue_entry):
showard77182562009-06-10 00:16:05 +00003133 """
3134 Get a list of tasks to perform before the host_queue_entry
3135 may be used to run this Job (such as Cleanup & Verify).
3136
3137 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003138 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003139 task in the list calls HostQueueEntry.on_pending(), which
3140 continues the flow of the job.
3141 """
showardc9ae1782009-01-30 01:42:37 +00003142 if self._should_run_cleanup(queue_entry):
showard8cc058f2009-09-08 16:26:33 +00003143 task = models.SpecialTask.Task.CLEANUP
3144 elif self._should_run_verify(queue_entry):
3145 task = models.SpecialTask.Task.VERIFY
3146 else:
3147 queue_entry.on_pending()
3148 return
3149
3150 models.SpecialTask.objects.create(
3151 host=models.Host(id=queue_entry.host_id),
3152 queue_entry=models.HostQueueEntry(id=queue_entry.id),
3153 task=task)
showard21baa452008-10-21 00:08:39 +00003154
3155
showardf1ae3542009-05-11 19:26:02 +00003156 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003157 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00003158 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00003159 else:
showardf1ae3542009-05-11 19:26:02 +00003160 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003161 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003162 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003163 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003164
3165 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003166 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003167
3168
3169 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003170 """
3171 @returns A tuple containing a list of HostQueueEntry instances to be
3172 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003173 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003174 """
showard77182562009-06-10 00:16:05 +00003175 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003176 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003177 if atomic_group:
3178 num_entries_wanted = atomic_group.max_number_of_machines
3179 else:
3180 num_entries_wanted = self.synch_count
3181 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003182
showardf1ae3542009-05-11 19:26:02 +00003183 if num_entries_wanted > 0:
3184 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003185 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003186 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003187 params=(self.id, include_queue_entry.id)))
3188
3189 # Sort the chosen hosts by hostname before slicing.
3190 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3191 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3192 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3193 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003194
showardf1ae3542009-05-11 19:26:02 +00003195 # Sanity check. We'll only ever be called if this can be met.
3196 assert len(chosen_entries) >= self.synch_count
3197
showard8cc058f2009-09-08 16:26:33 +00003198 group_name = include_queue_entry.get_group_name()
showardf1ae3542009-05-11 19:26:02 +00003199
3200 self._assign_new_group(chosen_entries, group_name=group_name)
showard8cc058f2009-09-08 16:26:33 +00003201 return chosen_entries
showard2bab8f42008-11-12 18:15:22 +00003202
3203
showard77182562009-06-10 00:16:05 +00003204 def run_if_ready(self, queue_entry):
3205 """
3206 @returns An Agent instance to ultimately run this job if enough hosts
3207 are ready for it to run.
3208 @returns None and potentially cleans up excess hosts if this Job
3209 is not ready to run.
3210 """
showardb2e2c322008-10-14 17:33:55 +00003211 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003212 self.stop_if_necessary()
showard8cc058f2009-09-08 16:26:33 +00003213 elif queue_entry.atomic_group:
3214 self.run_with_ready_delay(queue_entry)
3215 else:
3216 self.run(queue_entry)
showard77182562009-06-10 00:16:05 +00003217
3218
3219 def run_with_ready_delay(self, queue_entry):
3220 """
3221 Start a delay to wait for more hosts to enter Pending state before
3222 launching an atomic group job. Once set, the a delay cannot be reset.
3223
3224 @param queue_entry: The HostQueueEntry object to get atomic group
3225 info from and pass to run_if_ready when the delay is up.
3226
3227 @returns An Agent to run the job as appropriate or None if a delay
3228 has already been set.
3229 """
3230 assert queue_entry.job_id == self.id
3231 assert queue_entry.atomic_group
3232 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
showard708b3522009-08-20 23:26:15 +00003233 pending_threshold = min(self._hosts_assigned_count(),
3234 queue_entry.atomic_group.max_number_of_machines)
showard77182562009-06-10 00:16:05 +00003235 over_max_threshold = (self._pending_count() >= pending_threshold)
3236 delay_expired = (self._delay_ready_task and
3237 time.time() >= self._delay_ready_task.end_time)
3238
3239 # Delay is disabled or we already have enough? Do not wait to run.
3240 if not delay or over_max_threshold or delay_expired:
showard8cc058f2009-09-08 16:26:33 +00003241 self.run(queue_entry)
3242 else:
3243 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
showard77182562009-06-10 00:16:05 +00003244
showard8cc058f2009-09-08 16:26:33 +00003245
3246 def schedule_delayed_callback_task(self, queue_entry):
3247 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
3248
showard77182562009-06-10 00:16:05 +00003249 if self._delay_ready_task:
3250 return None
3251
showard8cc058f2009-09-08 16:26:33 +00003252 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3253
showard77182562009-06-10 00:16:05 +00003254 def run_job_after_delay():
3255 logging.info('Job %s done waiting for extra hosts.', self.id)
3256 return self.run(queue_entry)
3257
showard708b3522009-08-20 23:26:15 +00003258 logging.info('Job %s waiting up to %s seconds for more hosts.',
3259 self.id, delay)
showard77182562009-06-10 00:16:05 +00003260 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3261 callback=run_job_after_delay)
showard8cc058f2009-09-08 16:26:33 +00003262 return self._delay_ready_task
showard77182562009-06-10 00:16:05 +00003263
3264
3265 def run(self, queue_entry):
3266 """
3267 @param queue_entry: The HostQueueEntry instance calling this method.
showard77182562009-06-10 00:16:05 +00003268 """
3269 if queue_entry.atomic_group and self._atomic_and_has_started():
3270 logging.error('Job.run() called on running atomic Job %d '
3271 'with HQE %s.', self.id, queue_entry)
showard8cc058f2009-09-08 16:26:33 +00003272 return
3273 queue_entries = self._choose_group_to_run(queue_entry)
3274 self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00003275
3276
showard8cc058f2009-09-08 16:26:33 +00003277 def _finish_run(self, queue_entries):
showardb2ccdda2008-10-28 20:39:05 +00003278 for queue_entry in queue_entries:
showard8cc058f2009-09-08 16:26:33 +00003279 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showard77182562009-06-10 00:16:05 +00003280 if self._delay_ready_task:
3281 # Cancel any pending callback that would try to run again
3282 # as we are already running.
3283 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003284
showardb000a8d2009-07-28 20:02:07 +00003285 def __str__(self):
3286 return '%s-%s' % (self.id, self.owner)
3287
3288
mbligh36768f02008-02-22 18:28:33 +00003289if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003290 main()