blob: ce7cea576cadceb58e8e643ca67fc8275c1e30e1 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
showard549afad2009-08-20 23:33:36 +000023BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
24PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000025
mbligh36768f02008-02-22 18:28:33 +000026RESULTS_DIR = '.'
27AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000028DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000029AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
30
31if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000032 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000033AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
34AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
35
36if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000037 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000038
mbligh90a549d2008-03-25 23:52:34 +000039# how long to wait for autoserv to write a pidfile
40PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000041
showardd3dc1992009-04-22 21:01:40 +000042_AUTOSERV_PID_FILE = '.autoserv_execute'
43_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
44_PARSER_PID_FILE = '.parser_execute'
45
46_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
47 _PARSER_PID_FILE)
48
showard35162b02009-03-03 02:17:30 +000049# error message to leave in results dir when an autoserv process disappears
50# mysteriously
51_LOST_PROCESS_ERROR = """\
52Autoserv failed abnormally during execution for this job, probably due to a
53system error on the Autotest server. Full results may not be available. Sorry.
54"""
55
mbligh6f8bab42008-02-29 22:45:14 +000056_db = None
mbligh36768f02008-02-22 18:28:33 +000057_shutdown = False
showard170873e2009-01-07 00:22:26 +000058_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
59_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000060_testing_mode = False
showard542e8402008-09-19 20:16:18 +000061_base_url = None
showardc85c21b2008-11-24 22:17:37 +000062_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000063_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000064
65
mbligh83c1e9e2009-05-01 23:10:41 +000066def _site_init_monitor_db_dummy():
67 return {}
68
69
mbligh36768f02008-02-22 18:28:33 +000070def main():
showard27f33872009-04-07 18:20:53 +000071 try:
showard549afad2009-08-20 23:33:36 +000072 try:
73 main_without_exception_handling()
74 except SystemExit:
75 raise
76 except:
77 logging.exception('Exception escaping in monitor_db')
78 raise
79 finally:
80 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000081
82
83def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000084 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000085
showard136e6dc2009-06-10 19:38:49 +000086 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000087 parser = optparse.OptionParser(usage)
88 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
89 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000090 parser.add_option('--test', help='Indicate that scheduler is under ' +
91 'test and should use dummy autoserv and no parsing',
92 action='store_true')
93 (options, args) = parser.parse_args()
94 if len(args) != 1:
95 parser.print_usage()
96 return
mbligh36768f02008-02-22 18:28:33 +000097
showard5613c662009-06-08 23:30:33 +000098 scheduler_enabled = global_config.global_config.get_config_value(
99 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
100
101 if not scheduler_enabled:
102 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
103 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000104 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000105 sys.exit(1)
106
jadmanski0afbb632008-06-06 21:10:57 +0000107 global RESULTS_DIR
108 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000109
mbligh83c1e9e2009-05-01 23:10:41 +0000110 site_init = utils.import_site_function(__file__,
111 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
112 _site_init_monitor_db_dummy)
113 site_init()
114
showardcca334f2009-03-12 20:38:34 +0000115 # Change the cwd while running to avoid issues incase we were launched from
116 # somewhere odd (such as a random NFS home directory of the person running
117 # sudo to launch us as the appropriate user).
118 os.chdir(RESULTS_DIR)
119
jadmanski0afbb632008-06-06 21:10:57 +0000120 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000121 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
122 "notify_email_statuses",
123 default='')
showardc85c21b2008-11-24 22:17:37 +0000124 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000125 _notify_email_statuses = [status for status in
126 re.split(r'[\s,;:]', notify_statuses_list.lower())
127 if status]
showardc85c21b2008-11-24 22:17:37 +0000128
jadmanski0afbb632008-06-06 21:10:57 +0000129 if options.test:
130 global _autoserv_path
131 _autoserv_path = 'autoserv_dummy'
132 global _testing_mode
133 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000134
mbligh37eceaa2008-12-15 22:56:37 +0000135 # AUTOTEST_WEB.base_url is still a supported config option as some people
136 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000137 global _base_url
showard170873e2009-01-07 00:22:26 +0000138 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
139 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000140 if config_base_url:
141 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000142 else:
mbligh37eceaa2008-12-15 22:56:37 +0000143 # For the common case of everything running on a single server you
144 # can just set the hostname in a single place in the config file.
145 server_name = c.get_config_value('SERVER', 'hostname')
146 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000147 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000148 sys.exit(1)
149 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000150
showardc5afc462009-01-13 00:09:39 +0000151 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000152 server.start()
153
jadmanski0afbb632008-06-06 21:10:57 +0000154 try:
showard136e6dc2009-06-10 19:38:49 +0000155 init()
showardc5afc462009-01-13 00:09:39 +0000156 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000157 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000158
jadmanski0afbb632008-06-06 21:10:57 +0000159 while not _shutdown:
160 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000161 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000162 except:
showard170873e2009-01-07 00:22:26 +0000163 email_manager.manager.log_stacktrace(
164 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000165
showard170873e2009-01-07 00:22:26 +0000166 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000167 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000168 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000169 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000170
171
showard136e6dc2009-06-10 19:38:49 +0000172def setup_logging():
173 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
174 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
175 logging_manager.configure_logging(
176 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
177 logfile_name=log_name)
178
179
mbligh36768f02008-02-22 18:28:33 +0000180def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000181 global _shutdown
182 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000183 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000184
185
showard136e6dc2009-06-10 19:38:49 +0000186def init():
showardb18134f2009-03-20 20:52:18 +0000187 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
188 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000189
showard549afad2009-08-20 23:33:36 +0000190 if utils.process_is_alive(PID_FILE_PREFIX):
191 logging.critical("monitor_db already running, aborting!")
192 sys.exit(1)
193 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000194
showardb1e51872008-10-07 11:08:18 +0000195 if _testing_mode:
196 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000197 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000198
jadmanski0afbb632008-06-06 21:10:57 +0000199 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
200 global _db
showard170873e2009-01-07 00:22:26 +0000201 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000202 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000203
showardfa8629c2008-11-04 16:51:23 +0000204 # ensure Django connection is in autocommit
205 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000206 # bypass the readonly connection
207 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000208
showardb18134f2009-03-20 20:52:18 +0000209 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000210 signal.signal(signal.SIGINT, handle_sigint)
211
showardd1ee1dd2009-01-07 21:33:08 +0000212 drones = global_config.global_config.get_config_value(
213 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
214 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000215 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000216 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000217 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
218
showardb18134f2009-03-20 20:52:18 +0000219 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000220
221
showarded2afea2009-07-07 20:54:07 +0000222def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
223 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000224 """
225 @returns The autoserv command line as a list of executable + parameters.
226
227 @param machines - string - A machine or comma separated list of machines
228 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000229 @param extra_args - list - Additional arguments to pass to autoserv.
230 @param job - Job object - If supplied, -u owner and -l name parameters
231 will be added.
232 @param queue_entry - A HostQueueEntry object - If supplied and no Job
233 object was supplied, this will be used to lookup the Job object.
234 """
showard87ba02a2009-04-20 19:37:32 +0000235 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
showarded2afea2009-07-07 20:54:07 +0000236 '-r', drone_manager.WORKING_DIRECTORY]
showard87ba02a2009-04-20 19:37:32 +0000237 if job or queue_entry:
238 if not job:
239 job = queue_entry.job
240 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000241 if verbose:
242 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000243 return autoserv_argv + extra_args
244
245
showard89f84db2009-03-12 20:39:13 +0000246class SchedulerError(Exception):
247 """Raised by HostScheduler when an inconsistent state occurs."""
248
249
showard63a34772008-08-18 19:32:50 +0000250class HostScheduler(object):
251 def _get_ready_hosts(self):
252 # avoid any host with a currently active queue entry against it
253 hosts = Host.fetch(
254 joins='LEFT JOIN host_queue_entries AS active_hqe '
255 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000256 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000257 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000258 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000259 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
260 return dict((host.id, host) for host in hosts)
261
262
263 @staticmethod
264 def _get_sql_id_list(id_list):
265 return ','.join(str(item_id) for item_id in id_list)
266
267
268 @classmethod
showard989f25d2008-10-01 11:38:11 +0000269 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000270 if not id_list:
271 return {}
showard63a34772008-08-18 19:32:50 +0000272 query %= cls._get_sql_id_list(id_list)
273 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000274 return cls._process_many2many_dict(rows, flip)
275
276
277 @staticmethod
278 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000279 result = {}
280 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000281 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000282 if flip:
283 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000284 result.setdefault(left_id, set()).add(right_id)
285 return result
286
287
288 @classmethod
289 def _get_job_acl_groups(cls, job_ids):
290 query = """
showardd9ac4452009-02-07 02:04:37 +0000291 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000292 FROM jobs
293 INNER JOIN users ON users.login = jobs.owner
294 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
295 WHERE jobs.id IN (%s)
296 """
297 return cls._get_many2many_dict(query, job_ids)
298
299
300 @classmethod
301 def _get_job_ineligible_hosts(cls, job_ids):
302 query = """
303 SELECT job_id, host_id
304 FROM ineligible_host_queues
305 WHERE job_id IN (%s)
306 """
307 return cls._get_many2many_dict(query, job_ids)
308
309
310 @classmethod
showard989f25d2008-10-01 11:38:11 +0000311 def _get_job_dependencies(cls, job_ids):
312 query = """
313 SELECT job_id, label_id
314 FROM jobs_dependency_labels
315 WHERE job_id IN (%s)
316 """
317 return cls._get_many2many_dict(query, job_ids)
318
319
320 @classmethod
showard63a34772008-08-18 19:32:50 +0000321 def _get_host_acls(cls, host_ids):
322 query = """
showardd9ac4452009-02-07 02:04:37 +0000323 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000324 FROM acl_groups_hosts
325 WHERE host_id IN (%s)
326 """
327 return cls._get_many2many_dict(query, host_ids)
328
329
330 @classmethod
331 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000332 if not host_ids:
333 return {}, {}
showard63a34772008-08-18 19:32:50 +0000334 query = """
335 SELECT label_id, host_id
336 FROM hosts_labels
337 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000338 """ % cls._get_sql_id_list(host_ids)
339 rows = _db.execute(query)
340 labels_to_hosts = cls._process_many2many_dict(rows)
341 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
342 return labels_to_hosts, hosts_to_labels
343
344
345 @classmethod
346 def _get_labels(cls):
347 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000348
349
350 def refresh(self, pending_queue_entries):
351 self._hosts_available = self._get_ready_hosts()
352
353 relevant_jobs = [queue_entry.job_id
354 for queue_entry in pending_queue_entries]
355 self._job_acls = self._get_job_acl_groups(relevant_jobs)
356 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000357 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000358
359 host_ids = self._hosts_available.keys()
360 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000361 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
362
363 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000364
365
366 def _is_acl_accessible(self, host_id, queue_entry):
367 job_acls = self._job_acls.get(queue_entry.job_id, set())
368 host_acls = self._host_acls.get(host_id, set())
369 return len(host_acls.intersection(job_acls)) > 0
370
371
showard989f25d2008-10-01 11:38:11 +0000372 def _check_job_dependencies(self, job_dependencies, host_labels):
373 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000374 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000375
376
377 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
378 queue_entry):
showardade14e22009-01-26 22:38:32 +0000379 if not queue_entry.meta_host:
380 # bypass only_if_needed labels when a specific host is selected
381 return True
382
showard989f25d2008-10-01 11:38:11 +0000383 for label_id in host_labels:
384 label = self._labels[label_id]
385 if not label.only_if_needed:
386 # we don't care about non-only_if_needed labels
387 continue
388 if queue_entry.meta_host == label_id:
389 # if the label was requested in a metahost it's OK
390 continue
391 if label_id not in job_dependencies:
392 return False
393 return True
394
395
showard89f84db2009-03-12 20:39:13 +0000396 def _check_atomic_group_labels(self, host_labels, queue_entry):
397 """
398 Determine if the given HostQueueEntry's atomic group settings are okay
399 to schedule on a host with the given labels.
400
showard6157c632009-07-06 20:19:31 +0000401 @param host_labels: A list of label ids that the host has.
402 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000403
404 @returns True if atomic group settings are okay, False otherwise.
405 """
showard6157c632009-07-06 20:19:31 +0000406 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000407 queue_entry.atomic_group_id)
408
409
showard6157c632009-07-06 20:19:31 +0000410 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000411 """
412 Return the atomic group label id for a host with the given set of
413 labels if any, or None otherwise. Raises an exception if more than
414 one atomic group are found in the set of labels.
415
showard6157c632009-07-06 20:19:31 +0000416 @param host_labels: A list of label ids that the host has.
417 @param queue_entry: The HostQueueEntry we're testing. Only used for
418 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000419
420 @returns The id of the atomic group found on a label in host_labels
421 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000422 """
showard6157c632009-07-06 20:19:31 +0000423 atomic_labels = [self._labels[label_id] for label_id in host_labels
424 if self._labels[label_id].atomic_group_id is not None]
425 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000426 if not atomic_ids:
427 return None
428 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000429 logging.error('More than one Atomic Group on HQE "%s" via: %r',
430 queue_entry, atomic_labels)
431 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000432
433
434 def _get_atomic_group_labels(self, atomic_group_id):
435 """
436 Lookup the label ids that an atomic_group is associated with.
437
438 @param atomic_group_id - The id of the AtomicGroup to look up.
439
440 @returns A generator yeilding Label ids for this atomic group.
441 """
442 return (id for id, label in self._labels.iteritems()
443 if label.atomic_group_id == atomic_group_id
444 and not label.invalid)
445
446
showard54c1ea92009-05-20 00:32:58 +0000447 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000448 """
449 @param group_hosts - A sequence of Host ids to test for usability
450 and eligibility against the Job associated with queue_entry.
451 @param queue_entry - The HostQueueEntry that these hosts are being
452 tested for eligibility against.
453
454 @returns A subset of group_hosts Host ids that are eligible for the
455 supplied queue_entry.
456 """
457 return set(host_id for host_id in group_hosts
458 if self._is_host_usable(host_id)
459 and self._is_host_eligible_for_job(host_id, queue_entry))
460
461
showard989f25d2008-10-01 11:38:11 +0000462 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000463 if self._is_host_invalid(host_id):
464 # if an invalid host is scheduled for a job, it's a one-time host
465 # and it therefore bypasses eligibility checks. note this can only
466 # happen for non-metahosts, because invalid hosts have their label
467 # relationships cleared.
468 return True
469
showard989f25d2008-10-01 11:38:11 +0000470 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
471 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000472
showard89f84db2009-03-12 20:39:13 +0000473 return (self._is_acl_accessible(host_id, queue_entry) and
474 self._check_job_dependencies(job_dependencies, host_labels) and
475 self._check_only_if_needed_labels(
476 job_dependencies, host_labels, queue_entry) and
477 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000478
479
showard2924b0a2009-06-18 23:16:15 +0000480 def _is_host_invalid(self, host_id):
481 host_object = self._hosts_available.get(host_id, None)
482 return host_object and host_object.invalid
483
484
showard63a34772008-08-18 19:32:50 +0000485 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000486 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000487 return None
488 return self._hosts_available.pop(queue_entry.host_id, None)
489
490
491 def _is_host_usable(self, host_id):
492 if host_id not in self._hosts_available:
493 # host was already used during this scheduling cycle
494 return False
495 if self._hosts_available[host_id].invalid:
496 # Invalid hosts cannot be used for metahosts. They're included in
497 # the original query because they can be used by non-metahosts.
498 return False
499 return True
500
501
502 def _schedule_metahost(self, queue_entry):
503 label_id = queue_entry.meta_host
504 hosts_in_label = self._label_hosts.get(label_id, set())
505 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
506 set())
507
508 # must iterate over a copy so we can mutate the original while iterating
509 for host_id in list(hosts_in_label):
510 if not self._is_host_usable(host_id):
511 hosts_in_label.remove(host_id)
512 continue
513 if host_id in ineligible_host_ids:
514 continue
showard989f25d2008-10-01 11:38:11 +0000515 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000516 continue
517
showard89f84db2009-03-12 20:39:13 +0000518 # Remove the host from our cached internal state before returning
519 # the host object.
showard63a34772008-08-18 19:32:50 +0000520 hosts_in_label.remove(host_id)
521 return self._hosts_available.pop(host_id)
522 return None
523
524
525 def find_eligible_host(self, queue_entry):
526 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000527 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000528 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000529 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000530 return self._schedule_metahost(queue_entry)
531
532
showard89f84db2009-03-12 20:39:13 +0000533 def find_eligible_atomic_group(self, queue_entry):
534 """
535 Given an atomic group host queue entry, locate an appropriate group
536 of hosts for the associated job to run on.
537
538 The caller is responsible for creating new HQEs for the additional
539 hosts returned in order to run the actual job on them.
540
541 @returns A list of Host instances in a ready state to satisfy this
542 atomic group scheduling. Hosts will all belong to the same
543 atomic group label as specified by the queue_entry.
544 An empty list will be returned if no suitable atomic
545 group could be found.
546
547 TODO(gps): what is responsible for kicking off any attempted repairs on
548 a group of hosts? not this function, but something needs to. We do
549 not communicate that reason for returning [] outside of here...
550 For now, we'll just be unschedulable if enough hosts within one group
551 enter Repair Failed state.
552 """
553 assert queue_entry.atomic_group_id is not None
554 job = queue_entry.job
555 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000556 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000557 if job.synch_count > atomic_group.max_number_of_machines:
558 # Such a Job and HostQueueEntry should never be possible to
559 # create using the frontend. Regardless, we can't process it.
560 # Abort it immediately and log an error on the scheduler.
561 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000562 logging.error(
563 'Error: job %d synch_count=%d > requested atomic_group %d '
564 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
565 job.id, job.synch_count, atomic_group.id,
566 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000567 return []
568 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
569 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
570 set())
571
572 # Look in each label associated with atomic_group until we find one with
573 # enough hosts to satisfy the job.
574 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
575 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
576 if queue_entry.meta_host is not None:
577 # If we have a metahost label, only allow its hosts.
578 group_hosts.intersection_update(hosts_in_label)
579 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000580 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000581 group_hosts, queue_entry)
582
583 # Job.synch_count is treated as "minimum synch count" when
584 # scheduling for an atomic group of hosts. The atomic group
585 # number of machines is the maximum to pick out of a single
586 # atomic group label for scheduling at one time.
587 min_hosts = job.synch_count
588 max_hosts = atomic_group.max_number_of_machines
589
showard54c1ea92009-05-20 00:32:58 +0000590 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000591 # Not enough eligible hosts in this atomic group label.
592 continue
593
showard54c1ea92009-05-20 00:32:58 +0000594 eligible_hosts_in_group = [self._hosts_available[id]
595 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000596 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000597 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000598
showard89f84db2009-03-12 20:39:13 +0000599 # Limit ourselves to scheduling the atomic group size.
600 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000601 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000602
603 # Remove the selected hosts from our cached internal state
604 # of available hosts in order to return the Host objects.
605 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000606 for host in eligible_hosts_in_group:
607 hosts_in_label.discard(host.id)
608 self._hosts_available.pop(host.id)
609 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000610 return host_list
611
612 return []
613
614
showard170873e2009-01-07 00:22:26 +0000615class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000616 def __init__(self):
617 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000618 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000619 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000620 user_cleanup_time = scheduler_config.config.clean_interval
621 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
622 _db, user_cleanup_time)
623 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000624 self._host_agents = {}
625 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000626
mbligh36768f02008-02-22 18:28:33 +0000627
showard915958d2009-04-22 21:00:58 +0000628 def initialize(self, recover_hosts=True):
629 self._periodic_cleanup.initialize()
630 self._24hr_upkeep.initialize()
631
jadmanski0afbb632008-06-06 21:10:57 +0000632 # always recover processes
633 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000634
jadmanski0afbb632008-06-06 21:10:57 +0000635 if recover_hosts:
636 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000637
638
jadmanski0afbb632008-06-06 21:10:57 +0000639 def tick(self):
showard170873e2009-01-07 00:22:26 +0000640 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000641 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000642 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000643 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000644 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000645 self._schedule_new_jobs()
646 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000647 _drone_manager.execute_actions()
648 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000649
showard97aed502008-11-04 02:01:24 +0000650
mblighf3294cc2009-04-08 21:17:38 +0000651 def _run_cleanup(self):
652 self._periodic_cleanup.run_cleanup_maybe()
653 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000654
mbligh36768f02008-02-22 18:28:33 +0000655
showard170873e2009-01-07 00:22:26 +0000656 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
657 for object_id in object_ids:
658 agent_dict.setdefault(object_id, set()).add(agent)
659
660
661 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
662 for object_id in object_ids:
663 assert object_id in agent_dict
664 agent_dict[object_id].remove(agent)
665
666
jadmanski0afbb632008-06-06 21:10:57 +0000667 def add_agent(self, agent):
668 self._agents.append(agent)
669 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000670 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
671 self._register_agent_for_ids(self._queue_entry_agents,
672 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000673
showard170873e2009-01-07 00:22:26 +0000674
675 def get_agents_for_entry(self, queue_entry):
676 """
677 Find agents corresponding to the specified queue_entry.
678 """
showardd3dc1992009-04-22 21:01:40 +0000679 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000680
681
682 def host_has_agent(self, host):
683 """
684 Determine if there is currently an Agent present using this host.
685 """
686 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000687
688
jadmanski0afbb632008-06-06 21:10:57 +0000689 def remove_agent(self, agent):
690 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000691 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
692 agent)
693 self._unregister_agent_for_ids(self._queue_entry_agents,
694 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000695
696
jadmanski0afbb632008-06-06 21:10:57 +0000697 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000698 self._register_pidfiles()
699 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000700 self._recover_all_recoverable_entries()
showard6af73ad2009-07-28 20:00:58 +0000701 self._requeue_starting_entries()
showard6878e8b2009-07-20 22:37:45 +0000702 self._check_for_remaining_active_entries()
showard170873e2009-01-07 00:22:26 +0000703 self._reverify_remaining_hosts()
704 # reinitialize drones after killing orphaned processes, since they can
705 # leave around files when they die
706 _drone_manager.execute_actions()
707 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000708
showard170873e2009-01-07 00:22:26 +0000709
710 def _register_pidfiles(self):
711 # during recovery we may need to read pidfiles for both running and
712 # parsing entries
713 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000714 where="status IN ('Running', 'Gathering', 'Parsing')")
showarded2afea2009-07-07 20:54:07 +0000715 special_tasks = models.SpecialTask.objects.filter(is_active=True)
716 for execution_entry in itertools.chain(queue_entries, special_tasks):
showardd3dc1992009-04-22 21:01:40 +0000717 for pidfile_name in _ALL_PIDFILE_NAMES:
718 pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000719 execution_entry.execution_path(), pidfile_name=pidfile_name)
showardd3dc1992009-04-22 21:01:40 +0000720 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000721
722
showarded2afea2009-07-07 20:54:07 +0000723 def _get_recovery_run_monitor(self, execution_path, pidfile_name, orphans):
724 run_monitor = PidfileRunMonitor()
725 run_monitor.attach_to_existing_process(execution_path,
726 pidfile_name=pidfile_name)
727 if run_monitor.has_process():
728 orphans.discard(run_monitor.get_process())
729 return run_monitor, '(process %s)' % run_monitor.get_process()
730 return None, 'without process'
731
732
showardd3dc1992009-04-22 21:01:40 +0000733 def _recover_entries_with_status(self, status, orphans, pidfile_name,
734 recover_entries_fn):
735 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000736 for queue_entry in queue_entries:
737 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000738 # synchronous job we've already recovered
739 continue
showardd3dc1992009-04-22 21:01:40 +0000740 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showarded2afea2009-07-07 20:54:07 +0000741 run_monitor, process_string = self._get_recovery_run_monitor(
742 queue_entry.execution_path(), pidfile_name, orphans)
showard597bfd32009-05-08 18:22:50 +0000743
showarded2afea2009-07-07 20:54:07 +0000744 logging.info('Recovering %s entry %s %s',status.lower(),
745 ', '.join(str(entry) for entry in queue_entries),
746 process_string)
showardd3dc1992009-04-22 21:01:40 +0000747 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000748
749
showard6878e8b2009-07-20 22:37:45 +0000750 def _check_for_remaining_orphan_processes(self, orphans):
751 if not orphans:
752 return
753 subject = 'Unrecovered orphan autoserv processes remain'
754 message = '\n'.join(str(process) for process in orphans)
755 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000756
757 die_on_orphans = global_config.global_config.get_config_value(
758 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
759
760 if die_on_orphans:
761 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000762
showard170873e2009-01-07 00:22:26 +0000763
showardd3dc1992009-04-22 21:01:40 +0000764 def _recover_running_entries(self, orphans):
765 def recover_entries(job, queue_entries, run_monitor):
766 if run_monitor is not None:
showarded2afea2009-07-07 20:54:07 +0000767 queue_task = QueueTask(job=job, queue_entries=queue_entries,
768 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000769 self.add_agent(Agent(tasks=[queue_task],
770 num_processes=len(queue_entries)))
showard6878e8b2009-07-20 22:37:45 +0000771 else:
772 # we could do better, but this retains legacy behavior for now
773 for queue_entry in queue_entries:
774 logging.info('Requeuing running HQE %s since it has no '
775 'process' % queue_entry)
776 queue_entry.requeue()
showardd3dc1992009-04-22 21:01:40 +0000777
778 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
showarded2afea2009-07-07 20:54:07 +0000779 orphans, _AUTOSERV_PID_FILE,
showardd3dc1992009-04-22 21:01:40 +0000780 recover_entries)
781
782
783 def _recover_gathering_entries(self, orphans):
784 def recover_entries(job, queue_entries, run_monitor):
785 gather_task = GatherLogsTask(job, queue_entries,
showarded2afea2009-07-07 20:54:07 +0000786 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000787 self.add_agent(Agent([gather_task]))
788
789 self._recover_entries_with_status(
790 models.HostQueueEntry.Status.GATHERING,
791 orphans, _CRASHINFO_PID_FILE, recover_entries)
792
793
794 def _recover_parsing_entries(self, orphans):
795 def recover_entries(job, queue_entries, run_monitor):
796 reparse_task = FinalReparseTask(queue_entries,
showarded2afea2009-07-07 20:54:07 +0000797 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000798 self.add_agent(Agent([reparse_task], num_processes=0))
799
800 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
801 orphans, _PARSER_PID_FILE,
802 recover_entries)
803
804
805 def _recover_all_recoverable_entries(self):
806 orphans = _drone_manager.get_orphaned_autoserv_processes()
807 self._recover_running_entries(orphans)
808 self._recover_gathering_entries(orphans)
809 self._recover_parsing_entries(orphans)
showarded2afea2009-07-07 20:54:07 +0000810 self._recover_special_tasks(orphans)
showard6878e8b2009-07-20 22:37:45 +0000811 self._check_for_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000812
showard97aed502008-11-04 02:01:24 +0000813
showarded2afea2009-07-07 20:54:07 +0000814 def _recover_special_tasks(self, orphans):
showard2fe3f1d2009-07-06 20:19:11 +0000815 """\
816 Recovers all special tasks that have started running but have not
817 completed.
818 """
819
820 tasks = models.SpecialTask.objects.filter(is_active=True,
821 is_complete=False)
822 # Use ordering to force NULL queue_entry_id's to the end of the list
showarda5288b42009-07-28 20:06:08 +0000823 for task in tasks.order_by('-queue_entry__id'):
showard9b6ec502009-08-20 23:25:17 +0000824 if self.host_has_agent(task.host):
825 raise SchedulerError(
826 "%s already has a host agent %s." % (
827 task, self._host_agents.get(host.id)))
showard2fe3f1d2009-07-06 20:19:11 +0000828
829 host = Host(id=task.host.id)
830 queue_entry = None
831 if task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000832 queue_entry = HostQueueEntry(id=task.queue_entry.id)
showard2fe3f1d2009-07-06 20:19:11 +0000833
showarded2afea2009-07-07 20:54:07 +0000834 run_monitor, process_string = self._get_recovery_run_monitor(
835 task.execution_path(), _AUTOSERV_PID_FILE, orphans)
836
837 logging.info('Recovering %s %s', task, process_string)
838 self._recover_special_task(task, host, queue_entry, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000839
840
showarded2afea2009-07-07 20:54:07 +0000841 def _recover_special_task(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000842 """\
843 Recovers a single special task.
844 """
845 if task.task == models.SpecialTask.Task.VERIFY:
showarded2afea2009-07-07 20:54:07 +0000846 agent_tasks = self._recover_verify(task, host, queue_entry,
847 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000848 elif task.task == models.SpecialTask.Task.REPAIR:
showarded2afea2009-07-07 20:54:07 +0000849 agent_tasks = self._recover_repair(task, host, queue_entry,
850 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000851 elif task.task == models.SpecialTask.Task.CLEANUP:
showarded2afea2009-07-07 20:54:07 +0000852 agent_tasks = self._recover_cleanup(task, host, queue_entry,
853 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000854 else:
855 # Should never happen
856 logging.error(
857 "Special task id %d had invalid task %s", (task.id, task.task))
858
859 self.add_agent(Agent(agent_tasks))
860
861
showarded2afea2009-07-07 20:54:07 +0000862 def _recover_verify(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000863 """\
864 Recovers a verify task.
865 No associated queue entry: Verify host
866 With associated queue entry: Verify host, and run associated queue
867 entry
868 """
869 if not task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000870 return [VerifyTask(host=host, task=task,
871 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000872 else:
showarded2afea2009-07-07 20:54:07 +0000873 return [VerifyTask(queue_entry=queue_entry, task=task,
874 recover_run_monitor=run_monitor),
showard2fe3f1d2009-07-06 20:19:11 +0000875 SetEntryPendingTask(queue_entry=queue_entry)]
876
877
showarded2afea2009-07-07 20:54:07 +0000878 def _recover_repair(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000879 """\
880 Recovers a repair task.
881 Always repair host
882 """
showarded2afea2009-07-07 20:54:07 +0000883 return [RepairTask(host=host, queue_entry=queue_entry, task=task,
884 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000885
886
showarded2afea2009-07-07 20:54:07 +0000887 def _recover_cleanup(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000888 """\
889 Recovers a cleanup task.
890 No associated queue entry: Clean host
891 With associated queue entry: Clean host, verify host if needed, and
892 run associated queue entry
893 """
894 if not task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000895 return [CleanupTask(host=host, task=task,
896 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000897 else:
898 agent_tasks = [CleanupTask(queue_entry=queue_entry,
showarded2afea2009-07-07 20:54:07 +0000899 task=task,
900 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000901 if queue_entry.job.should_run_verify(queue_entry):
902 agent_tasks.append(VerifyTask(queue_entry=queue_entry))
903 agent_tasks.append(
904 SetEntryPendingTask(queue_entry=queue_entry))
905 return agent_tasks
906
907
showard6af73ad2009-07-28 20:00:58 +0000908 def _requeue_starting_entries(self):
909 # temporary measure until we implement proper recovery of Starting HQEs
910 for entry in HostQueueEntry.fetch(where='status="Starting"'):
911 logging.info('Requeuing "Starting" queue entry %s' % entry)
912 assert not self.get_agents_for_entry(entry)
913 assert entry.host.status == models.Host.Status.PENDING
914 self._reverify_hosts_where('id = %s' % entry.host.id)
915 entry.requeue()
916
917
showard6878e8b2009-07-20 22:37:45 +0000918 def _check_for_remaining_active_entries(self):
showard170873e2009-01-07 00:22:26 +0000919 queue_entries = HostQueueEntry.fetch(
showard70a294f2009-08-20 23:33:21 +0000920 where='active AND NOT complete AND status != "Pending"')
showardd3dc1992009-04-22 21:01:40 +0000921
showarde8e37072009-08-20 23:31:30 +0000922 unrecovered_active_hqes = [entry for entry in queue_entries
923 if not self.get_agents_for_entry(entry)]
924 if unrecovered_active_hqes:
925 message = '\n'.join(str(hqe) for hqe in unrecovered_active_hqes)
926 raise SchedulerError(
927 '%d unrecovered active host queue entries:\n%s' %
928 (len(unrecovered_active_hqes), message))
showard170873e2009-01-07 00:22:26 +0000929
930
showard1ff7b2e2009-05-15 23:17:18 +0000931 def _find_reverify(self):
showard6d7b2ff2009-06-10 00:16:47 +0000932 tasks = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +0000933 task=models.SpecialTask.Task.VERIFY, is_active=False,
934 is_complete=False, queue_entry__isnull=True)
showard6d7b2ff2009-06-10 00:16:47 +0000935
showard2fe3f1d2009-07-06 20:19:11 +0000936 for task in tasks:
937 host = Host.fetch(where='id = %s', params=(task.host.id,)).next()
938 if host.locked or host.invalid or self.host_has_agent(host):
939 continue
showard6d7b2ff2009-06-10 00:16:47 +0000940
showard2fe3f1d2009-07-06 20:19:11 +0000941 logging.info('Force reverifying host %s', host.hostname)
942 self.add_agent(Agent([VerifyTask(host=host, task=task)]))
showard1ff7b2e2009-05-15 23:17:18 +0000943
944
showard170873e2009-01-07 00:22:26 +0000945 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000946 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000947 # should never happen
showarded2afea2009-07-07 20:54:07 +0000948 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000949 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000950 self._reverify_hosts_where(
951 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Running')",
952 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000953
954
jadmanski0afbb632008-06-06 21:10:57 +0000955 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000956 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000957 full_where='locked = 0 AND invalid = 0 AND ' + where
958 for host in Host.fetch(where=full_where):
959 if self.host_has_agent(host):
960 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000961 continue
showard170873e2009-01-07 00:22:26 +0000962 if print_message:
showardb18134f2009-03-20 20:52:18 +0000963 logging.info(print_message, host.hostname)
showard2fe3f1d2009-07-06 20:19:11 +0000964 tasks = host.reverify_tasks()
showard170873e2009-01-07 00:22:26 +0000965 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000966
967
jadmanski0afbb632008-06-06 21:10:57 +0000968 def _recover_hosts(self):
969 # recover "Repair Failed" hosts
970 message = 'Reverifying dead host %s'
971 self._reverify_hosts_where("status = 'Repair Failed'",
972 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000973
974
showard04c82c52008-05-29 19:38:12 +0000975
showardb95b1bd2008-08-15 18:11:04 +0000976 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000977 # prioritize by job priority, then non-metahost over metahost, then FIFO
978 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000979 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000980 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000981 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000982
983
showard89f84db2009-03-12 20:39:13 +0000984 def _refresh_pending_queue_entries(self):
985 """
986 Lookup the pending HostQueueEntries and call our HostScheduler
987 refresh() method given that list. Return the list.
988
989 @returns A list of pending HostQueueEntries sorted in priority order.
990 """
showard63a34772008-08-18 19:32:50 +0000991 queue_entries = self._get_pending_queue_entries()
992 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000993 return []
showardb95b1bd2008-08-15 18:11:04 +0000994
showard63a34772008-08-18 19:32:50 +0000995 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000996
showard89f84db2009-03-12 20:39:13 +0000997 return queue_entries
998
999
1000 def _schedule_atomic_group(self, queue_entry):
1001 """
1002 Schedule the given queue_entry on an atomic group of hosts.
1003
1004 Returns immediately if there are insufficient available hosts.
1005
1006 Creates new HostQueueEntries based off of queue_entry for the
1007 scheduled hosts and starts them all running.
1008 """
1009 # This is a virtual host queue entry representing an entire
1010 # atomic group, find a group and schedule their hosts.
1011 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1012 queue_entry)
1013 if not group_hosts:
1014 return
showardcbe6f942009-06-17 19:33:49 +00001015
1016 logging.info('Expanding atomic group entry %s with hosts %s',
1017 queue_entry,
1018 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +00001019 # The first assigned host uses the original HostQueueEntry
1020 group_queue_entries = [queue_entry]
1021 for assigned_host in group_hosts[1:]:
1022 # Create a new HQE for every additional assigned_host.
1023 new_hqe = HostQueueEntry.clone(queue_entry)
1024 new_hqe.save()
1025 group_queue_entries.append(new_hqe)
1026 assert len(group_queue_entries) == len(group_hosts)
1027 for queue_entry, host in itertools.izip(group_queue_entries,
1028 group_hosts):
1029 self._run_queue_entry(queue_entry, host)
1030
1031
1032 def _schedule_new_jobs(self):
1033 queue_entries = self._refresh_pending_queue_entries()
1034 if not queue_entries:
1035 return
1036
showard63a34772008-08-18 19:32:50 +00001037 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +00001038 if (queue_entry.atomic_group_id is None or
1039 queue_entry.host_id is not None):
1040 assigned_host = self._host_scheduler.find_eligible_host(
1041 queue_entry)
1042 if assigned_host:
1043 self._run_queue_entry(queue_entry, assigned_host)
1044 else:
1045 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001046
1047
1048 def _run_queue_entry(self, queue_entry, host):
showard77182562009-06-10 00:16:05 +00001049 agent = queue_entry.run_pre_job_tasks(assigned_host=host)
1050 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +00001051
1052
jadmanski0afbb632008-06-06 21:10:57 +00001053 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001054 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001055 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001056 for agent in self.get_agents_for_entry(entry):
1057 agent.abort()
1058 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001059
1060
showard324bf812009-01-20 23:23:38 +00001061 def _can_start_agent(self, agent, num_started_this_cycle,
1062 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001063 # always allow zero-process agents to run
1064 if agent.num_processes == 0:
1065 return True
1066 # don't allow any nonzero-process agents to run after we've reached a
1067 # limit (this avoids starvation of many-process agents)
1068 if have_reached_limit:
1069 return False
1070 # total process throttling
showard324bf812009-01-20 23:23:38 +00001071 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +00001072 return False
1073 # if a single agent exceeds the per-cycle throttling, still allow it to
1074 # run when it's the first agent in the cycle
1075 if num_started_this_cycle == 0:
1076 return True
1077 # per-cycle throttling
1078 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +00001079 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001080 return False
1081 return True
1082
1083
jadmanski0afbb632008-06-06 21:10:57 +00001084 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001085 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001086 have_reached_limit = False
1087 # iterate over copy, so we can remove agents during iteration
1088 for agent in list(self._agents):
1089 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +00001090 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +00001091 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +00001092 continue
1093 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +00001094 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001095 have_reached_limit):
1096 have_reached_limit = True
1097 continue
showard4c5374f2008-09-04 17:02:56 +00001098 num_started_this_cycle += agent.num_processes
1099 agent.tick()
showarda9435c02009-05-13 21:28:17 +00001100 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001101 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001102
1103
showard29f7cd22009-04-29 21:16:24 +00001104 def _process_recurring_runs(self):
1105 recurring_runs = models.RecurringRun.objects.filter(
1106 start_date__lte=datetime.datetime.now())
1107 for rrun in recurring_runs:
1108 # Create job from template
1109 job = rrun.job
1110 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001111 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001112
1113 host_objects = info['hosts']
1114 one_time_hosts = info['one_time_hosts']
1115 metahost_objects = info['meta_hosts']
1116 dependencies = info['dependencies']
1117 atomic_group = info['atomic_group']
1118
1119 for host in one_time_hosts or []:
1120 this_host = models.Host.create_one_time_host(host.hostname)
1121 host_objects.append(this_host)
1122
1123 try:
1124 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001125 options=options,
showard29f7cd22009-04-29 21:16:24 +00001126 host_objects=host_objects,
1127 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001128 atomic_group=atomic_group)
1129
1130 except Exception, ex:
1131 logging.exception(ex)
1132 #TODO send email
1133
1134 if rrun.loop_count == 1:
1135 rrun.delete()
1136 else:
1137 if rrun.loop_count != 0: # if not infinite loop
1138 # calculate new start_date
1139 difference = datetime.timedelta(seconds=rrun.loop_period)
1140 rrun.start_date = rrun.start_date + difference
1141 rrun.loop_count -= 1
1142 rrun.save()
1143
1144
showard170873e2009-01-07 00:22:26 +00001145class PidfileRunMonitor(object):
1146 """
1147 Client must call either run() to start a new process or
1148 attach_to_existing_process().
1149 """
mbligh36768f02008-02-22 18:28:33 +00001150
showard170873e2009-01-07 00:22:26 +00001151 class _PidfileException(Exception):
1152 """
1153 Raised when there's some unexpected behavior with the pid file, but only
1154 used internally (never allowed to escape this class).
1155 """
mbligh36768f02008-02-22 18:28:33 +00001156
1157
showard170873e2009-01-07 00:22:26 +00001158 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001159 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001160 self._start_time = None
1161 self.pidfile_id = None
1162 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001163
1164
showard170873e2009-01-07 00:22:26 +00001165 def _add_nice_command(self, command, nice_level):
1166 if not nice_level:
1167 return command
1168 return ['nice', '-n', str(nice_level)] + command
1169
1170
1171 def _set_start_time(self):
1172 self._start_time = time.time()
1173
1174
1175 def run(self, command, working_directory, nice_level=None, log_file=None,
1176 pidfile_name=None, paired_with_pidfile=None):
1177 assert command is not None
1178 if nice_level is not None:
1179 command = ['nice', '-n', str(nice_level)] + command
1180 self._set_start_time()
1181 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001182 command, working_directory, pidfile_name=pidfile_name,
1183 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001184
1185
showarded2afea2009-07-07 20:54:07 +00001186 def attach_to_existing_process(self, execution_path,
showardd3dc1992009-04-22 21:01:40 +00001187 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001188 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001189 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001190 execution_path, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001191 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001192
1193
jadmanski0afbb632008-06-06 21:10:57 +00001194 def kill(self):
showard170873e2009-01-07 00:22:26 +00001195 if self.has_process():
1196 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001197
mbligh36768f02008-02-22 18:28:33 +00001198
showard170873e2009-01-07 00:22:26 +00001199 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001200 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001201 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001202
1203
showard170873e2009-01-07 00:22:26 +00001204 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001205 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001206 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001207 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001208
1209
showard170873e2009-01-07 00:22:26 +00001210 def _read_pidfile(self, use_second_read=False):
1211 assert self.pidfile_id is not None, (
1212 'You must call run() or attach_to_existing_process()')
1213 contents = _drone_manager.get_pidfile_contents(
1214 self.pidfile_id, use_second_read=use_second_read)
1215 if contents.is_invalid():
1216 self._state = drone_manager.PidfileContents()
1217 raise self._PidfileException(contents)
1218 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001219
1220
showard21baa452008-10-21 00:08:39 +00001221 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001222 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1223 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001224 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001225 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001226
1227
1228 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001229 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001230 return
mblighbb421852008-03-11 22:36:16 +00001231
showard21baa452008-10-21 00:08:39 +00001232 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001233
showard170873e2009-01-07 00:22:26 +00001234 if self._state.process is None:
1235 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001236 return
mbligh90a549d2008-03-25 23:52:34 +00001237
showard21baa452008-10-21 00:08:39 +00001238 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001239 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001240 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001241 return
mbligh90a549d2008-03-25 23:52:34 +00001242
showard170873e2009-01-07 00:22:26 +00001243 # pid but no running process - maybe process *just* exited
1244 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001245 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001246 # autoserv exited without writing an exit code
1247 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001248 self._handle_pidfile_error(
1249 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001250
showard21baa452008-10-21 00:08:39 +00001251
1252 def _get_pidfile_info(self):
1253 """\
1254 After completion, self._state will contain:
1255 pid=None, exit_status=None if autoserv has not yet run
1256 pid!=None, exit_status=None if autoserv is running
1257 pid!=None, exit_status!=None if autoserv has completed
1258 """
1259 try:
1260 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001261 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001262 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001263
1264
showard170873e2009-01-07 00:22:26 +00001265 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001266 """\
1267 Called when no pidfile is found or no pid is in the pidfile.
1268 """
showard170873e2009-01-07 00:22:26 +00001269 message = 'No pid found at %s' % self.pidfile_id
showard170873e2009-01-07 00:22:26 +00001270 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1271 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001272 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001273 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001274
1275
showard35162b02009-03-03 02:17:30 +00001276 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001277 """\
1278 Called when autoserv has exited without writing an exit status,
1279 or we've timed out waiting for autoserv to write a pid to the
1280 pidfile. In either case, we just return failure and the caller
1281 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001282
showard170873e2009-01-07 00:22:26 +00001283 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001284 """
1285 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001286 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001287 self._state.exit_status = 1
1288 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001289
1290
jadmanski0afbb632008-06-06 21:10:57 +00001291 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001292 self._get_pidfile_info()
1293 return self._state.exit_status
1294
1295
1296 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001297 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001298 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001299 if self._state.num_tests_failed is None:
1300 return -1
showard21baa452008-10-21 00:08:39 +00001301 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001302
1303
mbligh36768f02008-02-22 18:28:33 +00001304class Agent(object):
showard77182562009-06-10 00:16:05 +00001305 """
1306 An agent for use by the Dispatcher class to perform a sequence of tasks.
1307
1308 The following methods are required on all task objects:
1309 poll() - Called periodically to let the task check its status and
1310 update its internal state. If the task succeeded.
1311 is_done() - Returns True if the task is finished.
1312 abort() - Called when an abort has been requested. The task must
1313 set its aborted attribute to True if it actually aborted.
1314
1315 The following attributes are required on all task objects:
1316 aborted - bool, True if this task was aborted.
1317 failure_tasks - A sequence of tasks to be run using a new Agent
1318 by the dispatcher should this task fail.
1319 success - bool, True if this task succeeded.
1320 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1321 host_ids - A sequence of Host ids this task represents.
1322
1323 The following attribute is written to all task objects:
1324 agent - A reference to the Agent instance that the task has been
1325 added to.
1326 """
1327
1328
showard170873e2009-01-07 00:22:26 +00001329 def __init__(self, tasks, num_processes=1):
showard77182562009-06-10 00:16:05 +00001330 """
1331 @param tasks: A list of tasks as described in the class docstring.
1332 @param num_processes: The number of subprocesses the Agent represents.
1333 This is used by the Dispatcher for managing the load on the
1334 system. Defaults to 1.
1335 """
jadmanski0afbb632008-06-06 21:10:57 +00001336 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001337 self.queue = None
showard77182562009-06-10 00:16:05 +00001338 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001339 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001340 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001341
showard170873e2009-01-07 00:22:26 +00001342 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1343 for task in tasks)
1344 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1345
showardd3dc1992009-04-22 21:01:40 +00001346 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001347 for task in tasks:
1348 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001349
1350
showardd3dc1992009-04-22 21:01:40 +00001351 def _clear_queue(self):
1352 self.queue = Queue.Queue(0)
1353
1354
showard170873e2009-01-07 00:22:26 +00001355 def _union_ids(self, id_lists):
1356 return set(itertools.chain(*id_lists))
1357
1358
jadmanski0afbb632008-06-06 21:10:57 +00001359 def add_task(self, task):
1360 self.queue.put_nowait(task)
1361 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001362
1363
jadmanski0afbb632008-06-06 21:10:57 +00001364 def tick(self):
showard21baa452008-10-21 00:08:39 +00001365 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001366 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001367 self.active_task.poll()
1368 if not self.active_task.is_done():
1369 return
1370 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001371
1372
jadmanski0afbb632008-06-06 21:10:57 +00001373 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001374 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001375 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001376 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001377 if not self.active_task.success:
1378 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001379 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001380
jadmanski0afbb632008-06-06 21:10:57 +00001381 if not self.is_done():
1382 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001383
1384
jadmanski0afbb632008-06-06 21:10:57 +00001385 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001386 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001387 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1388 # get reset.
1389 new_agent = Agent(self.active_task.failure_tasks)
1390 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001391
mblighe2586682008-02-29 22:45:46 +00001392
showard4c5374f2008-09-04 17:02:56 +00001393 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001394 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001395
1396
jadmanski0afbb632008-06-06 21:10:57 +00001397 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001398 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001399
1400
showardd3dc1992009-04-22 21:01:40 +00001401 def abort(self):
showard08a36412009-05-05 01:01:13 +00001402 # abort tasks until the queue is empty or a task ignores the abort
1403 while not self.is_done():
1404 if not self.active_task:
1405 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001406 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001407 if not self.active_task.aborted:
1408 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001409 return
1410 self.active_task = None
1411
showardd3dc1992009-04-22 21:01:40 +00001412
showard77182562009-06-10 00:16:05 +00001413class DelayedCallTask(object):
1414 """
1415 A task object like AgentTask for an Agent to run that waits for the
1416 specified amount of time to have elapsed before calling the supplied
1417 callback once and finishing. If the callback returns anything, it is
1418 assumed to be a new Agent instance and will be added to the dispatcher.
1419
1420 @attribute end_time: The absolute posix time after which this task will
1421 call its callback when it is polled and be finished.
1422
1423 Also has all attributes required by the Agent class.
1424 """
1425 def __init__(self, delay_seconds, callback, now_func=None):
1426 """
1427 @param delay_seconds: The delay in seconds from now that this task
1428 will call the supplied callback and be done.
1429 @param callback: A callable to be called by this task once after at
1430 least delay_seconds time has elapsed. It must return None
1431 or a new Agent instance.
1432 @param now_func: A time.time like function. Default: time.time.
1433 Used for testing.
1434 """
1435 assert delay_seconds > 0
1436 assert callable(callback)
1437 if not now_func:
1438 now_func = time.time
1439 self._now_func = now_func
1440 self._callback = callback
1441
1442 self.end_time = self._now_func() + delay_seconds
1443
1444 # These attributes are required by Agent.
1445 self.aborted = False
1446 self.failure_tasks = ()
1447 self.host_ids = ()
1448 self.success = False
1449 self.queue_entry_ids = ()
1450 # This is filled in by Agent.add_task().
1451 self.agent = None
1452
1453
1454 def poll(self):
1455 if self._callback and self._now_func() >= self.end_time:
1456 new_agent = self._callback()
1457 if new_agent:
1458 self.agent.dispatcher.add_agent(new_agent)
1459 self._callback = None
1460 self.success = True
1461
1462
1463 def is_done(self):
1464 return not self._callback
1465
1466
1467 def abort(self):
1468 self.aborted = True
1469 self._callback = None
1470
1471
mbligh36768f02008-02-22 18:28:33 +00001472class AgentTask(object):
showarded2afea2009-07-07 20:54:07 +00001473 def __init__(self, cmd=None, working_directory=None, failure_tasks=[],
1474 pidfile_name=None, paired_with_pidfile=None,
1475 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001476 self.done = False
1477 self.failure_tasks = failure_tasks
jadmanski0afbb632008-06-06 21:10:57 +00001478 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001479 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001480 self.agent = None
showarded2afea2009-07-07 20:54:07 +00001481 self.monitor = recover_run_monitor
1482 self.started = bool(recover_run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +00001483 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001484 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001485 self.queue_entry_ids = []
1486 self.host_ids = []
1487 self.log_file = None
1488
1489
1490 def _set_ids(self, host=None, queue_entries=None):
1491 if queue_entries and queue_entries != [None]:
1492 self.host_ids = [entry.host.id for entry in queue_entries]
1493 self.queue_entry_ids = [entry.id for entry in queue_entries]
1494 else:
1495 assert host
1496 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001497
1498
jadmanski0afbb632008-06-06 21:10:57 +00001499 def poll(self):
showard08a36412009-05-05 01:01:13 +00001500 if not self.started:
1501 self.start()
1502 self.tick()
1503
1504
1505 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001506 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001507 exit_code = self.monitor.exit_code()
1508 if exit_code is None:
1509 return
1510 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001511 else:
1512 success = False
mbligh36768f02008-02-22 18:28:33 +00001513
jadmanski0afbb632008-06-06 21:10:57 +00001514 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001515
1516
jadmanski0afbb632008-06-06 21:10:57 +00001517 def is_done(self):
1518 return self.done
mbligh36768f02008-02-22 18:28:33 +00001519
1520
jadmanski0afbb632008-06-06 21:10:57 +00001521 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001522 if self.done:
1523 return
jadmanski0afbb632008-06-06 21:10:57 +00001524 self.done = True
1525 self.success = success
1526 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001527
1528
jadmanski0afbb632008-06-06 21:10:57 +00001529 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001530 assert not self.monitor
mblighd64e5702008-04-04 21:39:28 +00001531
mbligh36768f02008-02-22 18:28:33 +00001532
jadmanski0afbb632008-06-06 21:10:57 +00001533 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001534 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001535 _drone_manager.copy_to_results_repository(
1536 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001537
1538
jadmanski0afbb632008-06-06 21:10:57 +00001539 def epilog(self):
1540 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001541
1542
jadmanski0afbb632008-06-06 21:10:57 +00001543 def start(self):
1544 assert self.agent
1545
1546 if not self.started:
1547 self.prolog()
1548 self.run()
1549
1550 self.started = True
1551
1552
1553 def abort(self):
1554 if self.monitor:
1555 self.monitor.kill()
1556 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001557 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001558 self.cleanup()
1559
1560
showarded2afea2009-07-07 20:54:07 +00001561 def _get_consistent_execution_path(self, execution_entries):
1562 first_execution_path = execution_entries[0].execution_path()
1563 for execution_entry in execution_entries[1:]:
1564 assert execution_entry.execution_path() == first_execution_path, (
1565 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1566 execution_entry,
1567 first_execution_path,
1568 execution_entries[0]))
1569 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001570
1571
showarded2afea2009-07-07 20:54:07 +00001572 def _copy_results(self, execution_entries, use_monitor=None):
1573 """
1574 @param execution_entries: list of objects with execution_path() method
1575 """
showard6d1c1432009-08-20 23:30:39 +00001576 if use_monitor is not None and not use_monitor.has_process():
1577 return
1578
showarded2afea2009-07-07 20:54:07 +00001579 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001580 if use_monitor is None:
1581 assert self.monitor
1582 use_monitor = self.monitor
1583 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001584 execution_path = self._get_consistent_execution_path(execution_entries)
1585 results_path = execution_path + '/'
showard6b733412009-04-27 20:09:18 +00001586 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001587 results_path)
showardde634ee2009-01-30 01:44:24 +00001588
showarda1e74b32009-05-12 17:32:04 +00001589
1590 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001591 reparse_task = FinalReparseTask(queue_entries)
1592 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1593
1594
showarda1e74b32009-05-12 17:32:04 +00001595 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1596 self._copy_results(queue_entries, use_monitor)
1597 self._parse_results(queue_entries)
1598
1599
showardd3dc1992009-04-22 21:01:40 +00001600 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
showarded2afea2009-07-07 20:54:07 +00001601 assert not self.monitor
jadmanski0afbb632008-06-06 21:10:57 +00001602 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001603 self.monitor = PidfileRunMonitor()
1604 self.monitor.run(self.cmd, self._working_directory,
1605 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001606 log_file=self.log_file,
1607 pidfile_name=pidfile_name,
1608 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001609
1610
showardd9205182009-04-27 20:09:55 +00001611class TaskWithJobKeyvals(object):
1612 """AgentTask mixin providing functionality to help with job keyval files."""
1613 _KEYVAL_FILE = 'keyval'
1614 def _format_keyval(self, key, value):
1615 return '%s=%s' % (key, value)
1616
1617
1618 def _keyval_path(self):
1619 """Subclasses must override this"""
1620 raise NotImplemented
1621
1622
1623 def _write_keyval_after_job(self, field, value):
1624 assert self.monitor
1625 if not self.monitor.has_process():
1626 return
1627 _drone_manager.write_lines_to_file(
1628 self._keyval_path(), [self._format_keyval(field, value)],
1629 paired_with_process=self.monitor.get_process())
1630
1631
1632 def _job_queued_keyval(self, job):
1633 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1634
1635
1636 def _write_job_finished(self):
1637 self._write_keyval_after_job("job_finished", int(time.time()))
1638
1639
showarded2afea2009-07-07 20:54:07 +00001640class SpecialAgentTask(AgentTask):
1641 """
1642 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1643 """
1644
1645 TASK_TYPE = None
1646 host = None
1647 queue_entry = None
1648
1649 def __init__(self, task, extra_command_args, **kwargs):
1650 assert self.host
1651 assert (self.TASK_TYPE is not None,
1652 'self.TASK_TYPE must be overridden')
1653 self.task = task
showard8ac6f2a2009-07-16 14:50:32 +00001654 if task:
1655 kwargs['working_directory'] = task.execution_path()
showarded2afea2009-07-07 20:54:07 +00001656 self._extra_command_args = extra_command_args
1657 super(SpecialAgentTask, self).__init__(**kwargs)
1658
1659
1660 def prolog(self):
1661 super(SpecialAgentTask, self).prolog()
1662 self.task = models.SpecialTask.prepare(self, self.task)
1663 self.cmd = _autoserv_command_line(self.host.hostname,
1664 self._extra_command_args,
1665 queue_entry=self.queue_entry)
1666 self._working_directory = self.task.execution_path()
1667 self.task.activate()
1668
1669
showardb6681aa2009-07-08 21:15:00 +00001670 def cleanup(self):
1671 super(SpecialAgentTask, self).cleanup()
showarda640b2d2009-07-20 22:37:06 +00001672
1673 # self.task can be None if a SpecialAgentTask is aborted before the
1674 # prolog runs
1675 if self.task:
1676 self.task.finish()
1677
1678 if self.monitor and self.monitor.has_process() and self.task:
showarded2afea2009-07-07 20:54:07 +00001679 self._copy_results([self.task])
1680
1681
1682class RepairTask(SpecialAgentTask, TaskWithJobKeyvals):
1683 TASK_TYPE = models.SpecialTask.Task.REPAIR
1684
1685
1686 def __init__(self, host, queue_entry=None, task=None,
1687 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001688 """\
showard170873e2009-01-07 00:22:26 +00001689 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001690 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001691 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001692 # normalize the protection name
1693 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001694
jadmanski0afbb632008-06-06 21:10:57 +00001695 self.host = host
showard58721a82009-08-20 23:32:40 +00001696 self.queue_entry = queue_entry
showard170873e2009-01-07 00:22:26 +00001697
showarded2afea2009-07-07 20:54:07 +00001698 super(RepairTask, self).__init__(
1699 task, ['-R', '--host-protection', protection],
1700 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001701
showard2fe3f1d2009-07-06 20:19:11 +00001702 # *don't* include the queue entry in IDs -- if the queue entry is
1703 # aborted, we want to leave the repair task running
1704 self._set_ids(host=host)
1705
mbligh36768f02008-02-22 18:28:33 +00001706
jadmanski0afbb632008-06-06 21:10:57 +00001707 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001708 super(RepairTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001709 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001710 self.host.set_status('Repairing')
showard2fe3f1d2009-07-06 20:19:11 +00001711
mbligh36768f02008-02-22 18:28:33 +00001712
showardd9205182009-04-27 20:09:55 +00001713 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001714 return os.path.join(self._working_directory, self._KEYVAL_FILE)
showardd9205182009-04-27 20:09:55 +00001715
1716
showardde634ee2009-01-30 01:44:24 +00001717 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001718 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001719
showard2fe3f1d2009-07-06 20:19:11 +00001720 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001721 return # don't fail metahost entries, they'll be reassigned
1722
showard2fe3f1d2009-07-06 20:19:11 +00001723 self.queue_entry.update_from_database()
1724 if self.queue_entry.status != 'Queued':
showardccbd6c52009-03-21 00:10:21 +00001725 return # entry has been aborted
1726
showard2fe3f1d2009-07-06 20:19:11 +00001727 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001728 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001729 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001730 self._write_keyval_after_job(queued_key, queued_time)
1731 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001732 # copy results logs into the normal place for job results
1733 _drone_manager.copy_results_on_drone(
1734 self.monitor.get_process(),
showarded2afea2009-07-07 20:54:07 +00001735 source_path=self._working_directory + '/',
1736 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001737
showard2fe3f1d2009-07-06 20:19:11 +00001738 self._copy_results([self.queue_entry])
1739 if self.queue_entry.job.parse_failed_repair:
1740 self._parse_results([self.queue_entry])
1741 self.queue_entry.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001742
1743
jadmanski0afbb632008-06-06 21:10:57 +00001744 def epilog(self):
1745 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001746
jadmanski0afbb632008-06-06 21:10:57 +00001747 if self.success:
1748 self.host.set_status('Ready')
1749 else:
1750 self.host.set_status('Repair Failed')
showard2fe3f1d2009-07-06 20:19:11 +00001751 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001752 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001753
1754
showarded2afea2009-07-07 20:54:07 +00001755class PreJobTask(SpecialAgentTask):
showard170873e2009-01-07 00:22:26 +00001756 def epilog(self):
1757 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001758 should_copy_results = (self.queue_entry and not self.success
1759 and not self.queue_entry.meta_host)
1760 if should_copy_results:
1761 self.queue_entry.set_execution_subdir()
showarded2afea2009-07-07 20:54:07 +00001762 log_name = os.path.basename(self.task.execution_path())
1763 source = os.path.join(self.task.execution_path(), 'debug',
1764 'autoserv.DEBUG')
1765 destination = os.path.join(self.queue_entry.execution_path(),
1766 log_name)
showard170873e2009-01-07 00:22:26 +00001767 _drone_manager.copy_to_results_repository(
showarded2afea2009-07-07 20:54:07 +00001768 self.monitor.get_process(), source,
showard170873e2009-01-07 00:22:26 +00001769 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001770
showard58721a82009-08-20 23:32:40 +00001771 if not self.success and self.queue_entry:
1772 self.queue_entry.requeue()
1773
showard8fe93b52008-11-18 17:53:22 +00001774
1775class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001776 TASK_TYPE = models.SpecialTask.Task.VERIFY
1777
1778
1779 def __init__(self, queue_entry=None, host=None, task=None,
1780 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001781 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001782 self.host = host or queue_entry.host
1783 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001784
showarde788ea62008-11-17 21:02:47 +00001785 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showarded2afea2009-07-07 20:54:07 +00001786 super(VerifyTask, self).__init__(
1787 task, ['-v'], failure_tasks=failure_tasks,
1788 recover_run_monitor=recover_run_monitor)
mblighe2586682008-02-29 22:45:46 +00001789
showard170873e2009-01-07 00:22:26 +00001790 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001791
1792
jadmanski0afbb632008-06-06 21:10:57 +00001793 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001794 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001795
showardb18134f2009-03-20 20:52:18 +00001796 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001797 if self.queue_entry:
1798 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001799 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001800
showarded2afea2009-07-07 20:54:07 +00001801 # Delete any other queued verifies for this host. One verify will do
1802 # and there's no need to keep records of other requests.
1803 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001804 host__id=self.host.id,
1805 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001806 is_active=False, is_complete=False)
1807 queued_verifies = queued_verifies.exclude(id=self.task.id)
1808 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001809
mbligh36768f02008-02-22 18:28:33 +00001810
jadmanski0afbb632008-06-06 21:10:57 +00001811 def epilog(self):
1812 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001813 if self.success:
jadmanski0afbb632008-06-06 21:10:57 +00001814 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001815
1816
showardb5626452009-06-30 01:57:28 +00001817class CleanupHostsMixin(object):
1818 def _reboot_hosts(self, job, queue_entries, final_success,
1819 num_tests_failed):
1820 reboot_after = job.reboot_after
1821 do_reboot = (
1822 # always reboot after aborted jobs
1823 self._final_status == models.HostQueueEntry.Status.ABORTED
1824 or reboot_after == models.RebootAfter.ALWAYS
1825 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
1826 and final_success and num_tests_failed == 0))
1827
1828 for queue_entry in queue_entries:
1829 if do_reboot:
1830 # don't pass the queue entry to the CleanupTask. if the cleanup
1831 # fails, the job doesn't care -- it's over.
1832 cleanup_task = CleanupTask(host=queue_entry.host)
1833 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1834 else:
1835 queue_entry.host.set_status('Ready')
1836
1837
1838class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
showarded2afea2009-07-07 20:54:07 +00001839 def __init__(self, job, queue_entries, cmd=None, group_name='',
1840 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001841 self.job = job
1842 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001843 self.group_name = group_name
showarded2afea2009-07-07 20:54:07 +00001844 super(QueueTask, self).__init__(
1845 cmd, self._execution_path(),
1846 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001847 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001848
1849
showard73ec0442009-02-07 02:05:20 +00001850 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001851 return os.path.join(self._execution_path(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001852
1853
1854 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1855 keyval_contents = '\n'.join(self._format_keyval(key, value)
1856 for key, value in keyval_dict.iteritems())
1857 # always end with a newline to allow additional keyvals to be written
1858 keyval_contents += '\n'
showarded2afea2009-07-07 20:54:07 +00001859 _drone_manager.attach_file_to_execution(self._execution_path(),
showard73ec0442009-02-07 02:05:20 +00001860 keyval_contents,
1861 file_path=keyval_path)
1862
1863
1864 def _write_keyvals_before_job(self, keyval_dict):
1865 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1866
1867
showard170873e2009-01-07 00:22:26 +00001868 def _write_host_keyvals(self, host):
showarded2afea2009-07-07 20:54:07 +00001869 keyval_path = os.path.join(self._execution_path(), 'host_keyvals',
showard170873e2009-01-07 00:22:26 +00001870 host.hostname)
1871 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001872 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1873 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001874
1875
showarded2afea2009-07-07 20:54:07 +00001876 def _execution_path(self):
1877 return self.queue_entries[0].execution_path()
mblighbb421852008-03-11 22:36:16 +00001878
1879
jadmanski0afbb632008-06-06 21:10:57 +00001880 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001881 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001882 keyval_dict = {queued_key: queued_time}
1883 if self.group_name:
1884 keyval_dict['host_group_name'] = self.group_name
1885 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001886 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001887 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001888 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001889 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001890 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001891 queue_entry.host.update_field('dirty', 1)
showardc6a56872009-07-28 20:11:58 +00001892 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1893 # TODO(gps): Remove this if nothing needs it anymore.
1894 # A potential user is: tko/parser
jadmanski0afbb632008-06-06 21:10:57 +00001895 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001896
1897
showard35162b02009-03-03 02:17:30 +00001898 def _write_lost_process_error_file(self):
showarded2afea2009-07-07 20:54:07 +00001899 error_file_path = os.path.join(self._execution_path(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001900 _drone_manager.write_lines_to_file(error_file_path,
1901 [_LOST_PROCESS_ERROR])
1902
1903
showardd3dc1992009-04-22 21:01:40 +00001904 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001905 if not self.monitor:
1906 return
1907
showardd9205182009-04-27 20:09:55 +00001908 self._write_job_finished()
1909
showard6d1c1432009-08-20 23:30:39 +00001910 gather_task = GatherLogsTask(self.job, self.queue_entries)
1911 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001912
1913 if self.monitor.lost_process:
1914 self._write_lost_process_error_file()
1915 for queue_entry in self.queue_entries:
1916 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001917
1918
showardcbd74612008-11-19 21:42:02 +00001919 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001920 _drone_manager.write_lines_to_file(
showarded2afea2009-07-07 20:54:07 +00001921 os.path.join(self._execution_path(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001922 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001923 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001924
1925
jadmanskif7fa2cc2008-10-01 14:13:23 +00001926 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001927 if not self.monitor or not self.monitor.has_process():
1928 return
1929
jadmanskif7fa2cc2008-10-01 14:13:23 +00001930 # build up sets of all the aborted_by and aborted_on values
1931 aborted_by, aborted_on = set(), set()
1932 for queue_entry in self.queue_entries:
1933 if queue_entry.aborted_by:
1934 aborted_by.add(queue_entry.aborted_by)
1935 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1936 aborted_on.add(t)
1937
1938 # extract some actual, unique aborted by value and write it out
1939 assert len(aborted_by) <= 1
1940 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001941 aborted_by_value = aborted_by.pop()
1942 aborted_on_value = max(aborted_on)
1943 else:
1944 aborted_by_value = 'autotest_system'
1945 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001946
showarda0382352009-02-11 23:36:43 +00001947 self._write_keyval_after_job("aborted_by", aborted_by_value)
1948 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001949
showardcbd74612008-11-19 21:42:02 +00001950 aborted_on_string = str(datetime.datetime.fromtimestamp(
1951 aborted_on_value))
1952 self._write_status_comment('Job aborted by %s on %s' %
1953 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001954
1955
jadmanski0afbb632008-06-06 21:10:57 +00001956 def abort(self):
1957 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001958 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001959 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001960
1961
jadmanski0afbb632008-06-06 21:10:57 +00001962 def epilog(self):
1963 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001964 self._finish_task()
1965 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001966
1967
showardd3dc1992009-04-22 21:01:40 +00001968class PostJobTask(AgentTask):
1969 def __init__(self, queue_entries, pidfile_name, logfile_name,
showarded2afea2009-07-07 20:54:07 +00001970 recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00001971 self._queue_entries = queue_entries
1972 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00001973
showarded2afea2009-07-07 20:54:07 +00001974 self._execution_path = self._get_consistent_execution_path(
1975 queue_entries)
1976 self._results_dir = _drone_manager.absolute_path(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00001977 self._autoserv_monitor = PidfileRunMonitor()
showarded2afea2009-07-07 20:54:07 +00001978 self._autoserv_monitor.attach_to_existing_process(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00001979 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1980
1981 if _testing_mode:
1982 command = 'true'
1983 else:
1984 command = self._generate_command(self._results_dir)
1985
showarded2afea2009-07-07 20:54:07 +00001986 super(PostJobTask, self).__init__(
1987 cmd=command, working_directory=self._execution_path,
1988 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00001989
showarded2afea2009-07-07 20:54:07 +00001990 self.log_file = os.path.join(self._execution_path, logfile_name)
showardd3dc1992009-04-22 21:01:40 +00001991 self._final_status = self._determine_final_status()
1992
1993
1994 def _generate_command(self, results_dir):
1995 raise NotImplementedError('Subclasses must override this')
1996
1997
1998 def _job_was_aborted(self):
1999 was_aborted = None
2000 for queue_entry in self._queue_entries:
2001 queue_entry.update_from_database()
2002 if was_aborted is None: # first queue entry
2003 was_aborted = bool(queue_entry.aborted)
2004 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2005 email_manager.manager.enqueue_notify_email(
2006 'Inconsistent abort state',
2007 'Queue entries have inconsistent abort state: ' +
2008 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2009 # don't crash here, just assume true
2010 return True
2011 return was_aborted
2012
2013
2014 def _determine_final_status(self):
2015 if self._job_was_aborted():
2016 return models.HostQueueEntry.Status.ABORTED
2017
2018 # we'll use a PidfileRunMonitor to read the autoserv exit status
2019 if self._autoserv_monitor.exit_code() == 0:
2020 return models.HostQueueEntry.Status.COMPLETED
2021 return models.HostQueueEntry.Status.FAILED
2022
2023
2024 def run(self):
showard5add1c82009-05-26 19:27:46 +00002025 # make sure we actually have results to work with.
2026 # this should never happen in normal operation.
2027 if not self._autoserv_monitor.has_process():
2028 email_manager.manager.enqueue_notify_email(
2029 'No results in post-job task',
2030 'No results in post-job task at %s' %
2031 self._autoserv_monitor.pidfile_id)
2032 self.finished(False)
2033 return
2034
2035 super(PostJobTask, self).run(
2036 pidfile_name=self._pidfile_name,
2037 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00002038
2039
2040 def _set_all_statuses(self, status):
2041 for queue_entry in self._queue_entries:
2042 queue_entry.set_status(status)
2043
2044
2045 def abort(self):
2046 # override AgentTask.abort() to avoid killing the process and ending
2047 # the task. post-job tasks continue when the job is aborted.
2048 pass
2049
2050
showardb5626452009-06-30 01:57:28 +00002051class GatherLogsTask(PostJobTask, CleanupHostsMixin):
showardd3dc1992009-04-22 21:01:40 +00002052 """
2053 Task responsible for
2054 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2055 * copying logs to the results repository
2056 * spawning CleanupTasks for hosts, if necessary
2057 * spawning a FinalReparseTask for the job
2058 """
showarded2afea2009-07-07 20:54:07 +00002059 def __init__(self, job, queue_entries, recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002060 self._job = job
2061 super(GatherLogsTask, self).__init__(
2062 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
showarded2afea2009-07-07 20:54:07 +00002063 logfile_name='.collect_crashinfo.log',
2064 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002065 self._set_ids(queue_entries=queue_entries)
2066
2067
2068 def _generate_command(self, results_dir):
2069 host_list = ','.join(queue_entry.host.hostname
2070 for queue_entry in self._queue_entries)
2071 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2072 '-r', results_dir]
2073
2074
2075 def prolog(self):
2076 super(GatherLogsTask, self).prolog()
2077 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
2078
2079
showardd3dc1992009-04-22 21:01:40 +00002080 def epilog(self):
2081 super(GatherLogsTask, self).epilog()
showardb5626452009-06-30 01:57:28 +00002082
showard6d1c1432009-08-20 23:30:39 +00002083 self._copy_and_parse_results(self._queue_entries,
2084 use_monitor=self._autoserv_monitor)
2085
2086 if self._autoserv_monitor.has_process():
2087 final_success = (self._final_status ==
2088 models.HostQueueEntry.Status.COMPLETED)
2089 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2090 else:
2091 final_success = False
2092 num_tests_failed = 0
2093
showardb5626452009-06-30 01:57:28 +00002094 self._reboot_hosts(self._job, self._queue_entries, final_success,
2095 num_tests_failed)
showardd3dc1992009-04-22 21:01:40 +00002096
2097
showard0bbfc212009-04-29 21:06:13 +00002098 def run(self):
showard597bfd32009-05-08 18:22:50 +00002099 autoserv_exit_code = self._autoserv_monitor.exit_code()
2100 # only run if Autoserv exited due to some signal. if we have no exit
2101 # code, assume something bad (and signal-like) happened.
2102 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002103 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002104 else:
2105 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002106
2107
showard8fe93b52008-11-18 17:53:22 +00002108class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002109 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2110
2111
2112 def __init__(self, host=None, queue_entry=None, task=None,
2113 recover_run_monitor=None):
showardfa8629c2008-11-04 16:51:23 +00002114 assert bool(host) ^ bool(queue_entry)
2115 if queue_entry:
2116 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00002117 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00002118 self.host = host
showard170873e2009-01-07 00:22:26 +00002119
showarde788ea62008-11-17 21:02:47 +00002120 repair_task = RepairTask(host, queue_entry=queue_entry)
showarded2afea2009-07-07 20:54:07 +00002121 super(CleanupTask, self).__init__(
2122 task, ['--cleanup'], failure_tasks=[repair_task],
2123 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002124
2125 self._set_ids(host=host, queue_entries=[queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002126
mblighd5c95802008-03-05 00:33:46 +00002127
jadmanski0afbb632008-06-06 21:10:57 +00002128 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002129 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002130 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00002131 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00002132
mblighd5c95802008-03-05 00:33:46 +00002133
showard21baa452008-10-21 00:08:39 +00002134 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002135 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002136
showard21baa452008-10-21 00:08:39 +00002137 if self.success:
showardfa8629c2008-11-04 16:51:23 +00002138 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00002139 self.host.update_field('dirty', 0)
2140
2141
showardd3dc1992009-04-22 21:01:40 +00002142class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002143 _num_running_parses = 0
2144
showarded2afea2009-07-07 20:54:07 +00002145 def __init__(self, queue_entries, recover_run_monitor=None):
2146 super(FinalReparseTask, self).__init__(
2147 queue_entries, pidfile_name=_PARSER_PID_FILE,
2148 logfile_name='.parse.log',
2149 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002150 # don't use _set_ids, since we don't want to set the host_ids
2151 self.queue_entry_ids = [entry.id for entry in queue_entries]
showarded2afea2009-07-07 20:54:07 +00002152 self._parse_started = self.started
showard97aed502008-11-04 02:01:24 +00002153
showard97aed502008-11-04 02:01:24 +00002154
2155 @classmethod
2156 def _increment_running_parses(cls):
2157 cls._num_running_parses += 1
2158
2159
2160 @classmethod
2161 def _decrement_running_parses(cls):
2162 cls._num_running_parses -= 1
2163
2164
2165 @classmethod
2166 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002167 return (cls._num_running_parses <
2168 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002169
2170
2171 def prolog(self):
2172 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002173 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00002174
2175
2176 def epilog(self):
2177 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002178 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002179
2180
showardd3dc1992009-04-22 21:01:40 +00002181 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002182 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002183 results_dir]
showard97aed502008-11-04 02:01:24 +00002184
2185
showard08a36412009-05-05 01:01:13 +00002186 def tick(self):
2187 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002188 # and we can, at which point we revert to default behavior
2189 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002190 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002191 else:
2192 self._try_starting_parse()
2193
2194
2195 def run(self):
2196 # override run() to not actually run unless we can
2197 self._try_starting_parse()
2198
2199
2200 def _try_starting_parse(self):
2201 if not self._can_run_new_parse():
2202 return
showard170873e2009-01-07 00:22:26 +00002203
showard97aed502008-11-04 02:01:24 +00002204 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002205 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002206
showard97aed502008-11-04 02:01:24 +00002207 self._increment_running_parses()
2208 self._parse_started = True
2209
2210
2211 def finished(self, success):
2212 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002213 if self._parse_started:
2214 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002215
2216
showardc9ae1782009-01-30 01:42:37 +00002217class SetEntryPendingTask(AgentTask):
2218 def __init__(self, queue_entry):
2219 super(SetEntryPendingTask, self).__init__(cmd='')
2220 self._queue_entry = queue_entry
2221 self._set_ids(queue_entries=[queue_entry])
2222
2223
2224 def run(self):
2225 agent = self._queue_entry.on_pending()
2226 if agent:
2227 self.agent.dispatcher.add_agent(agent)
2228 self.finished(True)
2229
2230
showarda3c58572009-03-12 20:36:59 +00002231class DBError(Exception):
2232 """Raised by the DBObject constructor when its select fails."""
2233
2234
mbligh36768f02008-02-22 18:28:33 +00002235class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002236 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002237
2238 # Subclasses MUST override these:
2239 _table_name = ''
2240 _fields = ()
2241
showarda3c58572009-03-12 20:36:59 +00002242 # A mapping from (type, id) to the instance of the object for that
2243 # particular id. This prevents us from creating new Job() and Host()
2244 # instances for every HostQueueEntry object that we instantiate as
2245 # multiple HQEs often share the same Job.
2246 _instances_by_type_and_id = weakref.WeakValueDictionary()
2247 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002248
showarda3c58572009-03-12 20:36:59 +00002249
2250 def __new__(cls, id=None, **kwargs):
2251 """
2252 Look to see if we already have an instance for this particular type
2253 and id. If so, use it instead of creating a duplicate instance.
2254 """
2255 if id is not None:
2256 instance = cls._instances_by_type_and_id.get((cls, id))
2257 if instance:
2258 return instance
2259 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2260
2261
2262 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00002263 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00002264 assert self._table_name, '_table_name must be defined in your class'
2265 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002266 if not new_record:
2267 if self._initialized and not always_query:
2268 return # We've already been initialized.
2269 if id is None:
2270 id = row[0]
2271 # Tell future constructors to use us instead of re-querying while
2272 # this instance is still around.
2273 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002274
showard6ae5ea92009-02-25 00:11:51 +00002275 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002276
jadmanski0afbb632008-06-06 21:10:57 +00002277 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002278
jadmanski0afbb632008-06-06 21:10:57 +00002279 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002280 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002281
showarda3c58572009-03-12 20:36:59 +00002282 if self._initialized:
2283 differences = self._compare_fields_in_row(row)
2284 if differences:
showard7629f142009-03-27 21:02:02 +00002285 logging.warn(
2286 'initialized %s %s instance requery is updating: %s',
2287 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002288 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002289 self._initialized = True
2290
2291
2292 @classmethod
2293 def _clear_instance_cache(cls):
2294 """Used for testing, clear the internal instance cache."""
2295 cls._instances_by_type_and_id.clear()
2296
2297
showardccbd6c52009-03-21 00:10:21 +00002298 def _fetch_row_from_db(self, row_id):
2299 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2300 rows = _db.execute(sql, (row_id,))
2301 if not rows:
showard76e29d12009-04-15 21:53:10 +00002302 raise DBError("row not found (table=%s, row id=%s)"
2303 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002304 return rows[0]
2305
2306
showarda3c58572009-03-12 20:36:59 +00002307 def _assert_row_length(self, row):
2308 assert len(row) == len(self._fields), (
2309 "table = %s, row = %s/%d, fields = %s/%d" % (
2310 self.__table, row, len(row), self._fields, len(self._fields)))
2311
2312
2313 def _compare_fields_in_row(self, row):
2314 """
2315 Given a row as returned by a SELECT query, compare it to our existing
2316 in memory fields.
2317
2318 @param row - A sequence of values corresponding to fields named in
2319 The class attribute _fields.
2320
2321 @returns A dictionary listing the differences keyed by field name
2322 containing tuples of (current_value, row_value).
2323 """
2324 self._assert_row_length(row)
2325 differences = {}
2326 for field, row_value in itertools.izip(self._fields, row):
2327 current_value = getattr(self, field)
2328 if current_value != row_value:
2329 differences[field] = (current_value, row_value)
2330 return differences
showard2bab8f42008-11-12 18:15:22 +00002331
2332
2333 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002334 """
2335 Update our field attributes using a single row returned by SELECT.
2336
2337 @param row - A sequence of values corresponding to fields named in
2338 the class fields list.
2339 """
2340 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002341
showard2bab8f42008-11-12 18:15:22 +00002342 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002343 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002344 setattr(self, field, value)
2345 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002346
showard2bab8f42008-11-12 18:15:22 +00002347 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002348
mblighe2586682008-02-29 22:45:46 +00002349
showardccbd6c52009-03-21 00:10:21 +00002350 def update_from_database(self):
2351 assert self.id is not None
2352 row = self._fetch_row_from_db(self.id)
2353 self._update_fields_from_row(row)
2354
2355
jadmanski0afbb632008-06-06 21:10:57 +00002356 def count(self, where, table = None):
2357 if not table:
2358 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002359
jadmanski0afbb632008-06-06 21:10:57 +00002360 rows = _db.execute("""
2361 SELECT count(*) FROM %s
2362 WHERE %s
2363 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002364
jadmanski0afbb632008-06-06 21:10:57 +00002365 assert len(rows) == 1
2366
2367 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002368
2369
showardd3dc1992009-04-22 21:01:40 +00002370 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002371 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002372
showard2bab8f42008-11-12 18:15:22 +00002373 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002374 return
mbligh36768f02008-02-22 18:28:33 +00002375
mblighf8c624d2008-07-03 16:58:45 +00002376 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002377 _db.execute(query, (value, self.id))
2378
showard2bab8f42008-11-12 18:15:22 +00002379 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002380
2381
jadmanski0afbb632008-06-06 21:10:57 +00002382 def save(self):
2383 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002384 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002385 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002386 values = []
2387 for key in keys:
2388 value = getattr(self, key)
2389 if value is None:
2390 values.append('NULL')
2391 else:
2392 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002393 values_str = ','.join(values)
2394 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2395 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002396 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002397 # Update our id to the one the database just assigned to us.
2398 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002399
2400
jadmanski0afbb632008-06-06 21:10:57 +00002401 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002402 self._instances_by_type_and_id.pop((type(self), id), None)
2403 self._initialized = False
2404 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002405 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2406 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002407
2408
showard63a34772008-08-18 19:32:50 +00002409 @staticmethod
2410 def _prefix_with(string, prefix):
2411 if string:
2412 string = prefix + string
2413 return string
2414
2415
jadmanski0afbb632008-06-06 21:10:57 +00002416 @classmethod
showard989f25d2008-10-01 11:38:11 +00002417 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002418 """
2419 Construct instances of our class based on the given database query.
2420
2421 @yields One class instance for each row fetched.
2422 """
showard63a34772008-08-18 19:32:50 +00002423 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2424 where = cls._prefix_with(where, 'WHERE ')
2425 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002426 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002427 'joins' : joins,
2428 'where' : where,
2429 'order_by' : order_by})
2430 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002431 for row in rows:
2432 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002433
mbligh36768f02008-02-22 18:28:33 +00002434
2435class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002436 _table_name = 'ineligible_host_queues'
2437 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002438
2439
showard89f84db2009-03-12 20:39:13 +00002440class AtomicGroup(DBObject):
2441 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002442 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2443 'invalid')
showard89f84db2009-03-12 20:39:13 +00002444
2445
showard989f25d2008-10-01 11:38:11 +00002446class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002447 _table_name = 'labels'
2448 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002449 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002450
2451
showard6157c632009-07-06 20:19:31 +00002452 def __repr__(self):
2453 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2454 self.name, self.id, self.atomic_group_id)
2455
2456
mbligh36768f02008-02-22 18:28:33 +00002457class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002458 _table_name = 'hosts'
2459 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2460 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2461
2462
jadmanski0afbb632008-06-06 21:10:57 +00002463 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002464 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002465 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002466
2467
showard170873e2009-01-07 00:22:26 +00002468 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002469 """
showard170873e2009-01-07 00:22:26 +00002470 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002471 """
2472 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002473 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002474 FROM labels
2475 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002476 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002477 ORDER BY labels.name
2478 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002479 platform = None
2480 all_labels = []
2481 for label_name, is_platform in rows:
2482 if is_platform:
2483 platform = label_name
2484 all_labels.append(label_name)
2485 return platform, all_labels
2486
2487
showard2fe3f1d2009-07-06 20:19:11 +00002488 def reverify_tasks(self):
2489 cleanup_task = CleanupTask(host=self)
2490 verify_task = VerifyTask(host=self)
2491
showard6d7b2ff2009-06-10 00:16:47 +00002492 # just to make sure this host does not get taken away
showard2fe3f1d2009-07-06 20:19:11 +00002493 self.set_status('Cleaning')
2494 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002495
2496
showard54c1ea92009-05-20 00:32:58 +00002497 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2498
2499
2500 @classmethod
2501 def cmp_for_sort(cls, a, b):
2502 """
2503 A comparison function for sorting Host objects by hostname.
2504
2505 This strips any trailing numeric digits, ignores leading 0s and
2506 compares hostnames by the leading name and the trailing digits as a
2507 number. If both hostnames do not match this pattern, they are simply
2508 compared as lower case strings.
2509
2510 Example of how hostnames will be sorted:
2511
2512 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2513
2514 This hopefully satisfy most people's hostname sorting needs regardless
2515 of their exact naming schemes. Nobody sane should have both a host10
2516 and host010 (but the algorithm works regardless).
2517 """
2518 lower_a = a.hostname.lower()
2519 lower_b = b.hostname.lower()
2520 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2521 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2522 if match_a and match_b:
2523 name_a, number_a_str = match_a.groups()
2524 name_b, number_b_str = match_b.groups()
2525 number_a = int(number_a_str.lstrip('0'))
2526 number_b = int(number_b_str.lstrip('0'))
2527 result = cmp((name_a, number_a), (name_b, number_b))
2528 if result == 0 and lower_a != lower_b:
2529 # If they compared equal above but the lower case names are
2530 # indeed different, don't report equality. abc012 != abc12.
2531 return cmp(lower_a, lower_b)
2532 return result
2533 else:
2534 return cmp(lower_a, lower_b)
2535
2536
mbligh36768f02008-02-22 18:28:33 +00002537class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002538 _table_name = 'host_queue_entries'
2539 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002540 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002541 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002542
2543
showarda3c58572009-03-12 20:36:59 +00002544 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002545 assert id or row
showarda3c58572009-03-12 20:36:59 +00002546 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002547 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002548
jadmanski0afbb632008-06-06 21:10:57 +00002549 if self.host_id:
2550 self.host = Host(self.host_id)
2551 else:
2552 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002553
showard77182562009-06-10 00:16:05 +00002554 if self.atomic_group_id:
2555 self.atomic_group = AtomicGroup(self.atomic_group_id,
2556 always_query=False)
2557 else:
2558 self.atomic_group = None
2559
showard170873e2009-01-07 00:22:26 +00002560 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002561 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002562
2563
showard89f84db2009-03-12 20:39:13 +00002564 @classmethod
2565 def clone(cls, template):
2566 """
2567 Creates a new row using the values from a template instance.
2568
2569 The new instance will not exist in the database or have a valid
2570 id attribute until its save() method is called.
2571 """
2572 assert isinstance(template, cls)
2573 new_row = [getattr(template, field) for field in cls._fields]
2574 clone = cls(row=new_row, new_record=True)
2575 clone.id = None
2576 return clone
2577
2578
showardc85c21b2008-11-24 22:17:37 +00002579 def _view_job_url(self):
2580 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2581
2582
showardf1ae3542009-05-11 19:26:02 +00002583 def get_labels(self):
2584 """
2585 Get all labels associated with this host queue entry (either via the
2586 meta_host or as a job dependency label). The labels yielded are not
2587 guaranteed to be unique.
2588
2589 @yields Label instances associated with this host_queue_entry.
2590 """
2591 if self.meta_host:
2592 yield Label(id=self.meta_host, always_query=False)
2593 labels = Label.fetch(
2594 joins="JOIN jobs_dependency_labels AS deps "
2595 "ON (labels.id = deps.label_id)",
2596 where="deps.job_id = %d" % self.job.id)
2597 for label in labels:
2598 yield label
2599
2600
jadmanski0afbb632008-06-06 21:10:57 +00002601 def set_host(self, host):
2602 if host:
2603 self.queue_log_record('Assigning host ' + host.hostname)
2604 self.update_field('host_id', host.id)
2605 self.update_field('active', True)
2606 self.block_host(host.id)
2607 else:
2608 self.queue_log_record('Releasing host')
2609 self.unblock_host(self.host.id)
2610 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002611
jadmanski0afbb632008-06-06 21:10:57 +00002612 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002613
2614
jadmanski0afbb632008-06-06 21:10:57 +00002615 def get_host(self):
2616 return self.host
mbligh36768f02008-02-22 18:28:33 +00002617
2618
jadmanski0afbb632008-06-06 21:10:57 +00002619 def queue_log_record(self, log_line):
2620 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002621 _drone_manager.write_lines_to_file(self.queue_log_path,
2622 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002623
2624
jadmanski0afbb632008-06-06 21:10:57 +00002625 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002626 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002627 row = [0, self.job.id, host_id]
2628 block = IneligibleHostQueue(row=row, new_record=True)
2629 block.save()
mblighe2586682008-02-29 22:45:46 +00002630
2631
jadmanski0afbb632008-06-06 21:10:57 +00002632 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002633 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002634 blocks = IneligibleHostQueue.fetch(
2635 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2636 for block in blocks:
2637 block.delete()
mblighe2586682008-02-29 22:45:46 +00002638
2639
showard2bab8f42008-11-12 18:15:22 +00002640 def set_execution_subdir(self, subdir=None):
2641 if subdir is None:
2642 assert self.get_host()
2643 subdir = self.get_host().hostname
2644 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002645
2646
showard6355f6b2008-12-05 18:52:13 +00002647 def _get_hostname(self):
2648 if self.host:
2649 return self.host.hostname
2650 return 'no host'
2651
2652
showard170873e2009-01-07 00:22:26 +00002653 def __str__(self):
2654 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2655
2656
jadmanski0afbb632008-06-06 21:10:57 +00002657 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002658 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002659
showardb18134f2009-03-20 20:52:18 +00002660 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002661
showardc85c21b2008-11-24 22:17:37 +00002662 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002663 self.update_field('complete', False)
2664 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002665
jadmanski0afbb632008-06-06 21:10:57 +00002666 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002667 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002668 self.update_field('complete', False)
2669 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002670
showardc85c21b2008-11-24 22:17:37 +00002671 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002672 self.update_field('complete', True)
2673 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002674
2675 should_email_status = (status.lower() in _notify_email_statuses or
2676 'all' in _notify_email_statuses)
2677 if should_email_status:
2678 self._email_on_status(status)
2679
2680 self._email_on_job_complete()
2681
2682
2683 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002684 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002685
2686 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2687 self.job.id, self.job.name, hostname, status)
2688 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2689 self.job.id, self.job.name, hostname, status,
2690 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002691 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002692
2693
2694 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002695 if not self.job.is_finished():
2696 return
showard542e8402008-09-19 20:16:18 +00002697
showardc85c21b2008-11-24 22:17:37 +00002698 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002699 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002700 for queue_entry in hosts_queue:
2701 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002702 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002703 queue_entry.status))
2704
2705 summary_text = "\n".join(summary_text)
2706 status_counts = models.Job.objects.get_status_counts(
2707 [self.job.id])[self.job.id]
2708 status = ', '.join('%d %s' % (count, status) for status, count
2709 in status_counts.iteritems())
2710
2711 subject = 'Autotest: Job ID: %s "%s" %s' % (
2712 self.job.id, self.job.name, status)
2713 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2714 self.job.id, self.job.name, status, self._view_job_url(),
2715 summary_text)
showard170873e2009-01-07 00:22:26 +00002716 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002717
2718
showard77182562009-06-10 00:16:05 +00002719 def run_pre_job_tasks(self, assigned_host=None):
2720 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002721 assert assigned_host
2722 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002723 if self.host_id is None:
2724 self.set_host(assigned_host)
2725 else:
2726 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002727
showardcfd4a7e2009-07-11 01:47:33 +00002728 logging.info("%s/%s/%s scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002729 self.job.name, self.meta_host, self.atomic_group_id,
2730 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002731
showard77182562009-06-10 00:16:05 +00002732 return self._do_run_pre_job_tasks()
2733
2734
2735 def _do_run_pre_job_tasks(self):
2736 # Every host goes thru the Verifying stage (which may or may not
2737 # actually do anything as determined by get_pre_job_tasks).
2738 self.set_status(models.HostQueueEntry.Status.VERIFYING)
2739
2740 # The pre job tasks always end with a SetEntryPendingTask which
2741 # will continue as appropriate through queue_entry.on_pending().
2742 return Agent(self.job.get_pre_job_tasks(queue_entry=self))
mblighe2586682008-02-29 22:45:46 +00002743
showard6ae5ea92009-02-25 00:11:51 +00002744
jadmanski0afbb632008-06-06 21:10:57 +00002745 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002746 assert self.host
jadmanski0afbb632008-06-06 21:10:57 +00002747 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002748 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002749 # verify/cleanup failure sets the execution subdir, so reset it here
2750 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002751 if self.meta_host:
2752 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002753
2754
jadmanski0afbb632008-06-06 21:10:57 +00002755 def handle_host_failure(self):
2756 """\
2757 Called when this queue entry's host has failed verification and
2758 repair.
2759 """
2760 assert not self.meta_host
2761 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002762 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002763
2764
jadmanskif7fa2cc2008-10-01 14:13:23 +00002765 @property
2766 def aborted_by(self):
2767 self._load_abort_info()
2768 return self._aborted_by
2769
2770
2771 @property
2772 def aborted_on(self):
2773 self._load_abort_info()
2774 return self._aborted_on
2775
2776
2777 def _load_abort_info(self):
2778 """ Fetch info about who aborted the job. """
2779 if hasattr(self, "_aborted_by"):
2780 return
2781 rows = _db.execute("""
2782 SELECT users.login, aborted_host_queue_entries.aborted_on
2783 FROM aborted_host_queue_entries
2784 INNER JOIN users
2785 ON users.id = aborted_host_queue_entries.aborted_by_id
2786 WHERE aborted_host_queue_entries.queue_entry_id = %s
2787 """, (self.id,))
2788 if rows:
2789 self._aborted_by, self._aborted_on = rows[0]
2790 else:
2791 self._aborted_by = self._aborted_on = None
2792
2793
showardb2e2c322008-10-14 17:33:55 +00002794 def on_pending(self):
2795 """
2796 Called when an entry in a synchronous job has passed verify. If the
2797 job is ready to run, returns an agent to run the job. Returns None
2798 otherwise.
2799 """
2800 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002801 self.get_host().set_status('Pending')
showardb000a8d2009-07-28 20:02:07 +00002802
2803 # Some debug code here: sends an email if an asynchronous job does not
2804 # immediately enter Starting.
2805 # TODO: Remove this once we figure out why asynchronous jobs are getting
2806 # stuck in Pending.
2807 agent = self.job.run_if_ready(queue_entry=self)
2808 if self.job.synch_count == 1 and agent is None:
2809 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
2810 message = 'Asynchronous job stuck in Pending'
2811 email_manager.manager.enqueue_notify_email(subject, message)
2812 return agent
showardb2e2c322008-10-14 17:33:55 +00002813
2814
showardd3dc1992009-04-22 21:01:40 +00002815 def abort(self, dispatcher):
2816 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002817
showardd3dc1992009-04-22 21:01:40 +00002818 Status = models.HostQueueEntry.Status
2819 has_running_job_agent = (
2820 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2821 and dispatcher.get_agents_for_entry(self))
2822 if has_running_job_agent:
2823 # do nothing; post-job tasks will finish and then mark this entry
2824 # with status "Aborted" and take care of the host
2825 return
2826
2827 if self.status in (Status.STARTING, Status.PENDING):
2828 self.host.set_status(models.Host.Status.READY)
2829 elif self.status == Status.VERIFYING:
2830 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2831
2832 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002833
2834 def execution_tag(self):
2835 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00002836 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002837
2838
showarded2afea2009-07-07 20:54:07 +00002839 def execution_path(self):
2840 return self.execution_tag()
2841
2842
mbligh36768f02008-02-22 18:28:33 +00002843class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002844 _table_name = 'jobs'
2845 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2846 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002847 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002848 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002849
showard77182562009-06-10 00:16:05 +00002850 # This does not need to be a column in the DB. The delays are likely to
2851 # be configured short. If the scheduler is stopped and restarted in
2852 # the middle of a job's delay cycle, the delay cycle will either be
2853 # repeated or skipped depending on the number of Pending machines found
2854 # when the restarted scheduler recovers to track it. Not a problem.
2855 #
2856 # A reference to the DelayedCallTask that will wake up the job should
2857 # no other HQEs change state in time. Its end_time attribute is used
2858 # by our run_with_ready_delay() method to determine if the wait is over.
2859 _delay_ready_task = None
2860
2861 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2862 # all status='Pending' atomic group HQEs incase a delay was running when the
2863 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002864
showarda3c58572009-03-12 20:36:59 +00002865 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002866 assert id or row
showarda3c58572009-03-12 20:36:59 +00002867 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002868
mblighe2586682008-02-29 22:45:46 +00002869
jadmanski0afbb632008-06-06 21:10:57 +00002870 def is_server_job(self):
2871 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002872
2873
showard170873e2009-01-07 00:22:26 +00002874 def tag(self):
2875 return "%s-%s" % (self.id, self.owner)
2876
2877
jadmanski0afbb632008-06-06 21:10:57 +00002878 def get_host_queue_entries(self):
2879 rows = _db.execute("""
2880 SELECT * FROM host_queue_entries
2881 WHERE job_id= %s
2882 """, (self.id,))
2883 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002884
jadmanski0afbb632008-06-06 21:10:57 +00002885 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002886
jadmanski0afbb632008-06-06 21:10:57 +00002887 return entries
mbligh36768f02008-02-22 18:28:33 +00002888
2889
jadmanski0afbb632008-06-06 21:10:57 +00002890 def set_status(self, status, update_queues=False):
2891 self.update_field('status',status)
2892
2893 if update_queues:
2894 for queue_entry in self.get_host_queue_entries():
2895 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002896
2897
showard77182562009-06-10 00:16:05 +00002898 def _atomic_and_has_started(self):
2899 """
2900 @returns True if any of the HostQueueEntries associated with this job
2901 have entered the Status.STARTING state or beyond.
2902 """
2903 atomic_entries = models.HostQueueEntry.objects.filter(
2904 job=self.id, atomic_group__isnull=False)
2905 if atomic_entries.count() <= 0:
2906 return False
2907
showardaf8b4ca2009-06-16 18:47:26 +00002908 # These states may *only* be reached if Job.run() has been called.
2909 started_statuses = (models.HostQueueEntry.Status.STARTING,
2910 models.HostQueueEntry.Status.RUNNING,
2911 models.HostQueueEntry.Status.COMPLETED)
2912
2913 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00002914 return started_entries.count() > 0
2915
2916
showard708b3522009-08-20 23:26:15 +00002917 def _hosts_assigned_count(self):
2918 """The number of HostQueueEntries assigned a Host for this job."""
2919 entries = models.HostQueueEntry.objects.filter(job=self.id,
2920 host__isnull=False)
2921 return entries.count()
2922
2923
showard77182562009-06-10 00:16:05 +00002924 def _pending_count(self):
2925 """The number of HostQueueEntries for this job in the Pending state."""
2926 pending_entries = models.HostQueueEntry.objects.filter(
2927 job=self.id, status=models.HostQueueEntry.Status.PENDING)
2928 return pending_entries.count()
2929
2930
jadmanski0afbb632008-06-06 21:10:57 +00002931 def is_ready(self):
showard77182562009-06-10 00:16:05 +00002932 # NOTE: Atomic group jobs stop reporting ready after they have been
2933 # started to avoid launching multiple copies of one atomic job.
2934 # Only possible if synch_count is less than than half the number of
2935 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00002936 pending_count = self._pending_count()
2937 atomic_and_has_started = self._atomic_and_has_started()
2938 ready = (pending_count >= self.synch_count
2939 and not self._atomic_and_has_started())
2940
2941 if not ready:
2942 logging.info(
2943 'Job %s not ready: %s pending, %s required '
2944 '(Atomic and started: %s)',
2945 self, pending_count, self.synch_count,
2946 atomic_and_has_started)
2947
2948 return ready
mbligh36768f02008-02-22 18:28:33 +00002949
2950
jadmanski0afbb632008-06-06 21:10:57 +00002951 def num_machines(self, clause = None):
2952 sql = "job_id=%s" % self.id
2953 if clause:
2954 sql += " AND (%s)" % clause
2955 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002956
2957
jadmanski0afbb632008-06-06 21:10:57 +00002958 def num_queued(self):
2959 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002960
2961
jadmanski0afbb632008-06-06 21:10:57 +00002962 def num_active(self):
2963 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002964
2965
jadmanski0afbb632008-06-06 21:10:57 +00002966 def num_complete(self):
2967 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002968
2969
jadmanski0afbb632008-06-06 21:10:57 +00002970 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002971 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002972
mbligh36768f02008-02-22 18:28:33 +00002973
showard6bb7c292009-01-30 01:44:51 +00002974 def _not_yet_run_entries(self, include_verifying=True):
2975 statuses = [models.HostQueueEntry.Status.QUEUED,
2976 models.HostQueueEntry.Status.PENDING]
2977 if include_verifying:
2978 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2979 return models.HostQueueEntry.objects.filter(job=self.id,
2980 status__in=statuses)
2981
2982
2983 def _stop_all_entries(self):
2984 entries_to_stop = self._not_yet_run_entries(
2985 include_verifying=False)
2986 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002987 assert not child_entry.complete, (
2988 '%s status=%s, active=%s, complete=%s' %
2989 (child_entry.id, child_entry.status, child_entry.active,
2990 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002991 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2992 child_entry.host.status = models.Host.Status.READY
2993 child_entry.host.save()
2994 child_entry.status = models.HostQueueEntry.Status.STOPPED
2995 child_entry.save()
2996
showard2bab8f42008-11-12 18:15:22 +00002997 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002998 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002999 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00003000 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00003001
3002
jadmanski0afbb632008-06-06 21:10:57 +00003003 def write_to_machines_file(self, queue_entry):
3004 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00003005 file_path = os.path.join(self.tag(), '.machines')
3006 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00003007
3008
showardf1ae3542009-05-11 19:26:02 +00003009 def _next_group_name(self, group_name=''):
3010 """@returns a directory name to use for the next host group results."""
3011 if group_name:
3012 # Sanitize for use as a pathname.
3013 group_name = group_name.replace(os.path.sep, '_')
3014 if group_name.startswith('.'):
3015 group_name = '_' + group_name[1:]
3016 # Add a separator between the group name and 'group%d'.
3017 group_name += '.'
3018 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00003019 query = models.HostQueueEntry.objects.filter(
3020 job=self.id).values('execution_subdir').distinct()
3021 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003022 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3023 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003024 if ids:
3025 next_id = max(ids) + 1
3026 else:
3027 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003028 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003029
3030
showard170873e2009-01-07 00:22:26 +00003031 def _write_control_file(self, execution_tag):
3032 control_path = _drone_manager.attach_file_to_execution(
3033 execution_tag, self.control_file)
3034 return control_path
mbligh36768f02008-02-22 18:28:33 +00003035
showardb2e2c322008-10-14 17:33:55 +00003036
showard2bab8f42008-11-12 18:15:22 +00003037 def get_group_entries(self, queue_entry_from_group):
3038 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003039 return list(HostQueueEntry.fetch(
3040 where='job_id=%s AND execution_subdir=%s',
3041 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003042
3043
showardb2e2c322008-10-14 17:33:55 +00003044 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003045 assert queue_entries
3046 execution_tag = queue_entries[0].execution_tag()
3047 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00003048 hostnames = ','.join([entry.get_host().hostname
3049 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00003050
showard87ba02a2009-04-20 19:37:32 +00003051 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003052 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003053 ['-P', execution_tag, '-n',
3054 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003055 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003056
jadmanski0afbb632008-06-06 21:10:57 +00003057 if not self.is_server_job():
3058 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003059
showardb2e2c322008-10-14 17:33:55 +00003060 return params
mblighe2586682008-02-29 22:45:46 +00003061
mbligh36768f02008-02-22 18:28:33 +00003062
showardc9ae1782009-01-30 01:42:37 +00003063 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003064 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003065 return True
showard0fc38302008-10-23 00:44:07 +00003066 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00003067 return queue_entry.get_host().dirty
3068 return False
showard21baa452008-10-21 00:08:39 +00003069
showardc9ae1782009-01-30 01:42:37 +00003070
showard2fe3f1d2009-07-06 20:19:11 +00003071 def should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003072 do_not_verify = (queue_entry.host.protection ==
3073 host_protections.Protection.DO_NOT_VERIFY)
3074 if do_not_verify:
3075 return False
3076 return self.run_verify
3077
3078
showard77182562009-06-10 00:16:05 +00003079 def get_pre_job_tasks(self, queue_entry):
3080 """
3081 Get a list of tasks to perform before the host_queue_entry
3082 may be used to run this Job (such as Cleanup & Verify).
3083
3084 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003085 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003086 task in the list calls HostQueueEntry.on_pending(), which
3087 continues the flow of the job.
3088 """
showard21baa452008-10-21 00:08:39 +00003089 tasks = []
showardc9ae1782009-01-30 01:42:37 +00003090 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00003091 tasks.append(CleanupTask(queue_entry=queue_entry))
showard2fe3f1d2009-07-06 20:19:11 +00003092 if self.should_run_verify(queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003093 tasks.append(VerifyTask(queue_entry=queue_entry))
3094 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00003095 return tasks
3096
3097
showardf1ae3542009-05-11 19:26:02 +00003098 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003099 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00003100 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00003101 else:
showardf1ae3542009-05-11 19:26:02 +00003102 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003103 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003104 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003105 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003106
3107 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003108 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003109
3110
3111 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003112 """
3113 @returns A tuple containing a list of HostQueueEntry instances to be
3114 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003115 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003116 """
showard77182562009-06-10 00:16:05 +00003117 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003118 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003119 if atomic_group:
3120 num_entries_wanted = atomic_group.max_number_of_machines
3121 else:
3122 num_entries_wanted = self.synch_count
3123 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003124
showardf1ae3542009-05-11 19:26:02 +00003125 if num_entries_wanted > 0:
3126 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003127 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003128 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003129 params=(self.id, include_queue_entry.id)))
3130
3131 # Sort the chosen hosts by hostname before slicing.
3132 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3133 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3134 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3135 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003136
showardf1ae3542009-05-11 19:26:02 +00003137 # Sanity check. We'll only ever be called if this can be met.
3138 assert len(chosen_entries) >= self.synch_count
3139
3140 if atomic_group:
3141 # Look at any meta_host and dependency labels and pick the first
3142 # one that also specifies this atomic group. Use that label name
3143 # as the group name if possible (it is more specific).
3144 group_name = atomic_group.name
3145 for label in include_queue_entry.get_labels():
3146 if label.atomic_group_id:
3147 assert label.atomic_group_id == atomic_group.id
3148 group_name = label.name
3149 break
3150 else:
3151 group_name = ''
3152
3153 self._assign_new_group(chosen_entries, group_name=group_name)
3154 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00003155
3156
showard77182562009-06-10 00:16:05 +00003157 def run_if_ready(self, queue_entry):
3158 """
3159 @returns An Agent instance to ultimately run this job if enough hosts
3160 are ready for it to run.
3161 @returns None and potentially cleans up excess hosts if this Job
3162 is not ready to run.
3163 """
showardb2e2c322008-10-14 17:33:55 +00003164 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003165 self.stop_if_necessary()
3166 return None
mbligh36768f02008-02-22 18:28:33 +00003167
showard77182562009-06-10 00:16:05 +00003168 if queue_entry.atomic_group:
3169 return self.run_with_ready_delay(queue_entry)
3170
3171 return self.run(queue_entry)
3172
3173
3174 def run_with_ready_delay(self, queue_entry):
3175 """
3176 Start a delay to wait for more hosts to enter Pending state before
3177 launching an atomic group job. Once set, the a delay cannot be reset.
3178
3179 @param queue_entry: The HostQueueEntry object to get atomic group
3180 info from and pass to run_if_ready when the delay is up.
3181
3182 @returns An Agent to run the job as appropriate or None if a delay
3183 has already been set.
3184 """
3185 assert queue_entry.job_id == self.id
3186 assert queue_entry.atomic_group
3187 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
showard708b3522009-08-20 23:26:15 +00003188 pending_threshold = min(self._hosts_assigned_count(),
3189 queue_entry.atomic_group.max_number_of_machines)
showard77182562009-06-10 00:16:05 +00003190 over_max_threshold = (self._pending_count() >= pending_threshold)
3191 delay_expired = (self._delay_ready_task and
3192 time.time() >= self._delay_ready_task.end_time)
3193
3194 # Delay is disabled or we already have enough? Do not wait to run.
3195 if not delay or over_max_threshold or delay_expired:
3196 return self.run(queue_entry)
3197
3198 # A delay was previously scheduled.
3199 if self._delay_ready_task:
3200 return None
3201
3202 def run_job_after_delay():
3203 logging.info('Job %s done waiting for extra hosts.', self.id)
3204 return self.run(queue_entry)
3205
showard708b3522009-08-20 23:26:15 +00003206 logging.info('Job %s waiting up to %s seconds for more hosts.',
3207 self.id, delay)
showard77182562009-06-10 00:16:05 +00003208 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3209 callback=run_job_after_delay)
3210
3211 return Agent([self._delay_ready_task], num_processes=0)
3212
3213
3214 def run(self, queue_entry):
3215 """
3216 @param queue_entry: The HostQueueEntry instance calling this method.
3217 @returns An Agent instance to run this job or None if we've already
3218 been run.
3219 """
3220 if queue_entry.atomic_group and self._atomic_and_has_started():
3221 logging.error('Job.run() called on running atomic Job %d '
3222 'with HQE %s.', self.id, queue_entry)
3223 return None
showardf1ae3542009-05-11 19:26:02 +00003224 queue_entries, group_name = self._choose_group_to_run(queue_entry)
3225 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00003226
3227
showardf1ae3542009-05-11 19:26:02 +00003228 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00003229 for queue_entry in queue_entries:
3230 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00003231 params = self._get_autoserv_params(queue_entries)
3232 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00003233 cmd=params, group_name=group_name)
3234 tasks = [queue_task]
showard77182562009-06-10 00:16:05 +00003235 if self._delay_ready_task:
3236 # Cancel any pending callback that would try to run again
3237 # as we are already running.
3238 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003239
showard170873e2009-01-07 00:22:26 +00003240 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00003241
3242
showardb000a8d2009-07-28 20:02:07 +00003243 def __str__(self):
3244 return '%s-%s' % (self.id, self.owner)
3245
3246
mbligh36768f02008-02-22 18:28:33 +00003247if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003248 main()