blob: dc7f3904e6a70e7ccda2888e9b9a567f19048f35 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
mblighb090f142008-02-27 21:33:46 +000023
mbligh36768f02008-02-22 18:28:33 +000024RESULTS_DIR = '.'
25AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000026DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000027AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
63
mbligh83c1e9e2009-05-01 23:10:41 +000064def _site_init_monitor_db_dummy():
65 return {}
66
67
mbligh36768f02008-02-22 18:28:33 +000068def main():
showard27f33872009-04-07 18:20:53 +000069 try:
70 main_without_exception_handling()
showard29caa4b2009-05-26 19:27:09 +000071 except SystemExit:
72 raise
showard27f33872009-04-07 18:20:53 +000073 except:
74 logging.exception('Exception escaping in monitor_db')
75 raise
76
77
78def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000079 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000080
showard136e6dc2009-06-10 19:38:49 +000081 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000082 parser = optparse.OptionParser(usage)
83 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
84 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000085 parser.add_option('--test', help='Indicate that scheduler is under ' +
86 'test and should use dummy autoserv and no parsing',
87 action='store_true')
88 (options, args) = parser.parse_args()
89 if len(args) != 1:
90 parser.print_usage()
91 return
mbligh36768f02008-02-22 18:28:33 +000092
showard5613c662009-06-08 23:30:33 +000093 scheduler_enabled = global_config.global_config.get_config_value(
94 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
95
96 if not scheduler_enabled:
97 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
98 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +000099 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000100 sys.exit(1)
101
jadmanski0afbb632008-06-06 21:10:57 +0000102 global RESULTS_DIR
103 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000104
mbligh83c1e9e2009-05-01 23:10:41 +0000105 site_init = utils.import_site_function(__file__,
106 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
107 _site_init_monitor_db_dummy)
108 site_init()
109
showardcca334f2009-03-12 20:38:34 +0000110 # Change the cwd while running to avoid issues incase we were launched from
111 # somewhere odd (such as a random NFS home directory of the person running
112 # sudo to launch us as the appropriate user).
113 os.chdir(RESULTS_DIR)
114
jadmanski0afbb632008-06-06 21:10:57 +0000115 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000116 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
117 "notify_email_statuses",
118 default='')
showardc85c21b2008-11-24 22:17:37 +0000119 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000120 _notify_email_statuses = [status for status in
121 re.split(r'[\s,;:]', notify_statuses_list.lower())
122 if status]
showardc85c21b2008-11-24 22:17:37 +0000123
jadmanski0afbb632008-06-06 21:10:57 +0000124 if options.test:
125 global _autoserv_path
126 _autoserv_path = 'autoserv_dummy'
127 global _testing_mode
128 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000129
mbligh37eceaa2008-12-15 22:56:37 +0000130 # AUTOTEST_WEB.base_url is still a supported config option as some people
131 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000132 global _base_url
showard170873e2009-01-07 00:22:26 +0000133 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
134 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000135 if config_base_url:
136 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000137 else:
mbligh37eceaa2008-12-15 22:56:37 +0000138 # For the common case of everything running on a single server you
139 # can just set the hostname in a single place in the config file.
140 server_name = c.get_config_value('SERVER', 'hostname')
141 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000142 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000143 sys.exit(1)
144 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000145
showardc5afc462009-01-13 00:09:39 +0000146 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000147 server.start()
148
jadmanski0afbb632008-06-06 21:10:57 +0000149 try:
showard136e6dc2009-06-10 19:38:49 +0000150 init()
showardc5afc462009-01-13 00:09:39 +0000151 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000152 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000153
jadmanski0afbb632008-06-06 21:10:57 +0000154 while not _shutdown:
155 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000156 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000157 except:
showard170873e2009-01-07 00:22:26 +0000158 email_manager.manager.log_stacktrace(
159 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000160
showard170873e2009-01-07 00:22:26 +0000161 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000162 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000163 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000164 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000165
166
showard136e6dc2009-06-10 19:38:49 +0000167def setup_logging():
168 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
169 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
170 logging_manager.configure_logging(
171 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
172 logfile_name=log_name)
173
174
mbligh36768f02008-02-22 18:28:33 +0000175def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000176 global _shutdown
177 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000178 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000179
180
showard136e6dc2009-06-10 19:38:49 +0000181def init():
showardb18134f2009-03-20 20:52:18 +0000182 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
183 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000184
mblighfb676032009-04-01 18:25:38 +0000185 utils.write_pid("monitor_db")
186
showardb1e51872008-10-07 11:08:18 +0000187 if _testing_mode:
188 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000189 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000190
jadmanski0afbb632008-06-06 21:10:57 +0000191 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
192 global _db
showard170873e2009-01-07 00:22:26 +0000193 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000194 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000195
showardfa8629c2008-11-04 16:51:23 +0000196 # ensure Django connection is in autocommit
197 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000198 # bypass the readonly connection
199 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000200
showardb18134f2009-03-20 20:52:18 +0000201 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000202 signal.signal(signal.SIGINT, handle_sigint)
203
showardd1ee1dd2009-01-07 21:33:08 +0000204 drones = global_config.global_config.get_config_value(
205 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
206 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000207 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000208 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000209 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
210
showardb18134f2009-03-20 20:52:18 +0000211 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000212
213
showarded2afea2009-07-07 20:54:07 +0000214def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
215 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000216 """
217 @returns The autoserv command line as a list of executable + parameters.
218
219 @param machines - string - A machine or comma separated list of machines
220 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000221 @param extra_args - list - Additional arguments to pass to autoserv.
222 @param job - Job object - If supplied, -u owner and -l name parameters
223 will be added.
224 @param queue_entry - A HostQueueEntry object - If supplied and no Job
225 object was supplied, this will be used to lookup the Job object.
226 """
showard87ba02a2009-04-20 19:37:32 +0000227 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
showarded2afea2009-07-07 20:54:07 +0000228 '-r', drone_manager.WORKING_DIRECTORY]
showard87ba02a2009-04-20 19:37:32 +0000229 if job or queue_entry:
230 if not job:
231 job = queue_entry.job
232 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000233 if verbose:
234 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000235 return autoserv_argv + extra_args
236
237
showard89f84db2009-03-12 20:39:13 +0000238class SchedulerError(Exception):
239 """Raised by HostScheduler when an inconsistent state occurs."""
240
241
showard63a34772008-08-18 19:32:50 +0000242class HostScheduler(object):
243 def _get_ready_hosts(self):
244 # avoid any host with a currently active queue entry against it
245 hosts = Host.fetch(
246 joins='LEFT JOIN host_queue_entries AS active_hqe '
247 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000248 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000249 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000250 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000251 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
252 return dict((host.id, host) for host in hosts)
253
254
255 @staticmethod
256 def _get_sql_id_list(id_list):
257 return ','.join(str(item_id) for item_id in id_list)
258
259
260 @classmethod
showard989f25d2008-10-01 11:38:11 +0000261 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000262 if not id_list:
263 return {}
showard63a34772008-08-18 19:32:50 +0000264 query %= cls._get_sql_id_list(id_list)
265 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000266 return cls._process_many2many_dict(rows, flip)
267
268
269 @staticmethod
270 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000271 result = {}
272 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000273 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000274 if flip:
275 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000276 result.setdefault(left_id, set()).add(right_id)
277 return result
278
279
280 @classmethod
281 def _get_job_acl_groups(cls, job_ids):
282 query = """
showardd9ac4452009-02-07 02:04:37 +0000283 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000284 FROM jobs
285 INNER JOIN users ON users.login = jobs.owner
286 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
287 WHERE jobs.id IN (%s)
288 """
289 return cls._get_many2many_dict(query, job_ids)
290
291
292 @classmethod
293 def _get_job_ineligible_hosts(cls, job_ids):
294 query = """
295 SELECT job_id, host_id
296 FROM ineligible_host_queues
297 WHERE job_id IN (%s)
298 """
299 return cls._get_many2many_dict(query, job_ids)
300
301
302 @classmethod
showard989f25d2008-10-01 11:38:11 +0000303 def _get_job_dependencies(cls, job_ids):
304 query = """
305 SELECT job_id, label_id
306 FROM jobs_dependency_labels
307 WHERE job_id IN (%s)
308 """
309 return cls._get_many2many_dict(query, job_ids)
310
311
312 @classmethod
showard63a34772008-08-18 19:32:50 +0000313 def _get_host_acls(cls, host_ids):
314 query = """
showardd9ac4452009-02-07 02:04:37 +0000315 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000316 FROM acl_groups_hosts
317 WHERE host_id IN (%s)
318 """
319 return cls._get_many2many_dict(query, host_ids)
320
321
322 @classmethod
323 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000324 if not host_ids:
325 return {}, {}
showard63a34772008-08-18 19:32:50 +0000326 query = """
327 SELECT label_id, host_id
328 FROM hosts_labels
329 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000330 """ % cls._get_sql_id_list(host_ids)
331 rows = _db.execute(query)
332 labels_to_hosts = cls._process_many2many_dict(rows)
333 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
334 return labels_to_hosts, hosts_to_labels
335
336
337 @classmethod
338 def _get_labels(cls):
339 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000340
341
342 def refresh(self, pending_queue_entries):
343 self._hosts_available = self._get_ready_hosts()
344
345 relevant_jobs = [queue_entry.job_id
346 for queue_entry in pending_queue_entries]
347 self._job_acls = self._get_job_acl_groups(relevant_jobs)
348 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000349 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000350
351 host_ids = self._hosts_available.keys()
352 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000353 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
354
355 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000356
357
358 def _is_acl_accessible(self, host_id, queue_entry):
359 job_acls = self._job_acls.get(queue_entry.job_id, set())
360 host_acls = self._host_acls.get(host_id, set())
361 return len(host_acls.intersection(job_acls)) > 0
362
363
showard989f25d2008-10-01 11:38:11 +0000364 def _check_job_dependencies(self, job_dependencies, host_labels):
365 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000366 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000367
368
369 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
370 queue_entry):
showardade14e22009-01-26 22:38:32 +0000371 if not queue_entry.meta_host:
372 # bypass only_if_needed labels when a specific host is selected
373 return True
374
showard989f25d2008-10-01 11:38:11 +0000375 for label_id in host_labels:
376 label = self._labels[label_id]
377 if not label.only_if_needed:
378 # we don't care about non-only_if_needed labels
379 continue
380 if queue_entry.meta_host == label_id:
381 # if the label was requested in a metahost it's OK
382 continue
383 if label_id not in job_dependencies:
384 return False
385 return True
386
387
showard89f84db2009-03-12 20:39:13 +0000388 def _check_atomic_group_labels(self, host_labels, queue_entry):
389 """
390 Determine if the given HostQueueEntry's atomic group settings are okay
391 to schedule on a host with the given labels.
392
showard6157c632009-07-06 20:19:31 +0000393 @param host_labels: A list of label ids that the host has.
394 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000395
396 @returns True if atomic group settings are okay, False otherwise.
397 """
showard6157c632009-07-06 20:19:31 +0000398 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000399 queue_entry.atomic_group_id)
400
401
showard6157c632009-07-06 20:19:31 +0000402 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000403 """
404 Return the atomic group label id for a host with the given set of
405 labels if any, or None otherwise. Raises an exception if more than
406 one atomic group are found in the set of labels.
407
showard6157c632009-07-06 20:19:31 +0000408 @param host_labels: A list of label ids that the host has.
409 @param queue_entry: The HostQueueEntry we're testing. Only used for
410 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000411
412 @returns The id of the atomic group found on a label in host_labels
413 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000414 """
showard6157c632009-07-06 20:19:31 +0000415 atomic_labels = [self._labels[label_id] for label_id in host_labels
416 if self._labels[label_id].atomic_group_id is not None]
417 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000418 if not atomic_ids:
419 return None
420 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000421 logging.error('More than one Atomic Group on HQE "%s" via: %r',
422 queue_entry, atomic_labels)
423 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000424
425
426 def _get_atomic_group_labels(self, atomic_group_id):
427 """
428 Lookup the label ids that an atomic_group is associated with.
429
430 @param atomic_group_id - The id of the AtomicGroup to look up.
431
432 @returns A generator yeilding Label ids for this atomic group.
433 """
434 return (id for id, label in self._labels.iteritems()
435 if label.atomic_group_id == atomic_group_id
436 and not label.invalid)
437
438
showard54c1ea92009-05-20 00:32:58 +0000439 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000440 """
441 @param group_hosts - A sequence of Host ids to test for usability
442 and eligibility against the Job associated with queue_entry.
443 @param queue_entry - The HostQueueEntry that these hosts are being
444 tested for eligibility against.
445
446 @returns A subset of group_hosts Host ids that are eligible for the
447 supplied queue_entry.
448 """
449 return set(host_id for host_id in group_hosts
450 if self._is_host_usable(host_id)
451 and self._is_host_eligible_for_job(host_id, queue_entry))
452
453
showard989f25d2008-10-01 11:38:11 +0000454 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000455 if self._is_host_invalid(host_id):
456 # if an invalid host is scheduled for a job, it's a one-time host
457 # and it therefore bypasses eligibility checks. note this can only
458 # happen for non-metahosts, because invalid hosts have their label
459 # relationships cleared.
460 return True
461
showard989f25d2008-10-01 11:38:11 +0000462 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
463 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000464
showard89f84db2009-03-12 20:39:13 +0000465 return (self._is_acl_accessible(host_id, queue_entry) and
466 self._check_job_dependencies(job_dependencies, host_labels) and
467 self._check_only_if_needed_labels(
468 job_dependencies, host_labels, queue_entry) and
469 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000470
471
showard2924b0a2009-06-18 23:16:15 +0000472 def _is_host_invalid(self, host_id):
473 host_object = self._hosts_available.get(host_id, None)
474 return host_object and host_object.invalid
475
476
showard63a34772008-08-18 19:32:50 +0000477 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000478 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000479 return None
480 return self._hosts_available.pop(queue_entry.host_id, None)
481
482
483 def _is_host_usable(self, host_id):
484 if host_id not in self._hosts_available:
485 # host was already used during this scheduling cycle
486 return False
487 if self._hosts_available[host_id].invalid:
488 # Invalid hosts cannot be used for metahosts. They're included in
489 # the original query because they can be used by non-metahosts.
490 return False
491 return True
492
493
494 def _schedule_metahost(self, queue_entry):
495 label_id = queue_entry.meta_host
496 hosts_in_label = self._label_hosts.get(label_id, set())
497 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
498 set())
499
500 # must iterate over a copy so we can mutate the original while iterating
501 for host_id in list(hosts_in_label):
502 if not self._is_host_usable(host_id):
503 hosts_in_label.remove(host_id)
504 continue
505 if host_id in ineligible_host_ids:
506 continue
showard989f25d2008-10-01 11:38:11 +0000507 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000508 continue
509
showard89f84db2009-03-12 20:39:13 +0000510 # Remove the host from our cached internal state before returning
511 # the host object.
showard63a34772008-08-18 19:32:50 +0000512 hosts_in_label.remove(host_id)
513 return self._hosts_available.pop(host_id)
514 return None
515
516
517 def find_eligible_host(self, queue_entry):
518 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000519 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000520 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000521 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000522 return self._schedule_metahost(queue_entry)
523
524
showard89f84db2009-03-12 20:39:13 +0000525 def find_eligible_atomic_group(self, queue_entry):
526 """
527 Given an atomic group host queue entry, locate an appropriate group
528 of hosts for the associated job to run on.
529
530 The caller is responsible for creating new HQEs for the additional
531 hosts returned in order to run the actual job on them.
532
533 @returns A list of Host instances in a ready state to satisfy this
534 atomic group scheduling. Hosts will all belong to the same
535 atomic group label as specified by the queue_entry.
536 An empty list will be returned if no suitable atomic
537 group could be found.
538
539 TODO(gps): what is responsible for kicking off any attempted repairs on
540 a group of hosts? not this function, but something needs to. We do
541 not communicate that reason for returning [] outside of here...
542 For now, we'll just be unschedulable if enough hosts within one group
543 enter Repair Failed state.
544 """
545 assert queue_entry.atomic_group_id is not None
546 job = queue_entry.job
547 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000548 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000549 if job.synch_count > atomic_group.max_number_of_machines:
550 # Such a Job and HostQueueEntry should never be possible to
551 # create using the frontend. Regardless, we can't process it.
552 # Abort it immediately and log an error on the scheduler.
553 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000554 logging.error(
555 'Error: job %d synch_count=%d > requested atomic_group %d '
556 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
557 job.id, job.synch_count, atomic_group.id,
558 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000559 return []
560 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
561 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
562 set())
563
564 # Look in each label associated with atomic_group until we find one with
565 # enough hosts to satisfy the job.
566 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
567 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
568 if queue_entry.meta_host is not None:
569 # If we have a metahost label, only allow its hosts.
570 group_hosts.intersection_update(hosts_in_label)
571 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000572 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000573 group_hosts, queue_entry)
574
575 # Job.synch_count is treated as "minimum synch count" when
576 # scheduling for an atomic group of hosts. The atomic group
577 # number of machines is the maximum to pick out of a single
578 # atomic group label for scheduling at one time.
579 min_hosts = job.synch_count
580 max_hosts = atomic_group.max_number_of_machines
581
showard54c1ea92009-05-20 00:32:58 +0000582 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000583 # Not enough eligible hosts in this atomic group label.
584 continue
585
showard54c1ea92009-05-20 00:32:58 +0000586 eligible_hosts_in_group = [self._hosts_available[id]
587 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000588 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000589 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000590
showard89f84db2009-03-12 20:39:13 +0000591 # Limit ourselves to scheduling the atomic group size.
592 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000593 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000594
595 # Remove the selected hosts from our cached internal state
596 # of available hosts in order to return the Host objects.
597 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000598 for host in eligible_hosts_in_group:
599 hosts_in_label.discard(host.id)
600 self._hosts_available.pop(host.id)
601 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000602 return host_list
603
604 return []
605
606
showard170873e2009-01-07 00:22:26 +0000607class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000608 def __init__(self):
609 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000610 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000611 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000612 user_cleanup_time = scheduler_config.config.clean_interval
613 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
614 _db, user_cleanup_time)
615 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000616 self._host_agents = {}
617 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000618
mbligh36768f02008-02-22 18:28:33 +0000619
showard915958d2009-04-22 21:00:58 +0000620 def initialize(self, recover_hosts=True):
621 self._periodic_cleanup.initialize()
622 self._24hr_upkeep.initialize()
623
jadmanski0afbb632008-06-06 21:10:57 +0000624 # always recover processes
625 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000626
jadmanski0afbb632008-06-06 21:10:57 +0000627 if recover_hosts:
628 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000629
630
jadmanski0afbb632008-06-06 21:10:57 +0000631 def tick(self):
showard170873e2009-01-07 00:22:26 +0000632 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000633 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000634 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000635 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000636 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000637 self._schedule_new_jobs()
638 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000639 _drone_manager.execute_actions()
640 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000641
showard97aed502008-11-04 02:01:24 +0000642
mblighf3294cc2009-04-08 21:17:38 +0000643 def _run_cleanup(self):
644 self._periodic_cleanup.run_cleanup_maybe()
645 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000646
mbligh36768f02008-02-22 18:28:33 +0000647
showard170873e2009-01-07 00:22:26 +0000648 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
649 for object_id in object_ids:
650 agent_dict.setdefault(object_id, set()).add(agent)
651
652
653 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
654 for object_id in object_ids:
655 assert object_id in agent_dict
656 agent_dict[object_id].remove(agent)
657
658
jadmanski0afbb632008-06-06 21:10:57 +0000659 def add_agent(self, agent):
660 self._agents.append(agent)
661 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000662 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
663 self._register_agent_for_ids(self._queue_entry_agents,
664 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000665
showard170873e2009-01-07 00:22:26 +0000666
667 def get_agents_for_entry(self, queue_entry):
668 """
669 Find agents corresponding to the specified queue_entry.
670 """
showardd3dc1992009-04-22 21:01:40 +0000671 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000672
673
674 def host_has_agent(self, host):
675 """
676 Determine if there is currently an Agent present using this host.
677 """
678 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000679
680
jadmanski0afbb632008-06-06 21:10:57 +0000681 def remove_agent(self, agent):
682 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000683 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
684 agent)
685 self._unregister_agent_for_ids(self._queue_entry_agents,
686 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000687
688
jadmanski0afbb632008-06-06 21:10:57 +0000689 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000690 self._register_pidfiles()
691 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000692 self._recover_all_recoverable_entries()
showard6af73ad2009-07-28 20:00:58 +0000693 self._requeue_starting_entries()
showard6878e8b2009-07-20 22:37:45 +0000694 self._check_for_remaining_active_entries()
showard170873e2009-01-07 00:22:26 +0000695 self._reverify_remaining_hosts()
696 # reinitialize drones after killing orphaned processes, since they can
697 # leave around files when they die
698 _drone_manager.execute_actions()
699 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000700
showard170873e2009-01-07 00:22:26 +0000701
702 def _register_pidfiles(self):
703 # during recovery we may need to read pidfiles for both running and
704 # parsing entries
705 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000706 where="status IN ('Running', 'Gathering', 'Parsing')")
showarded2afea2009-07-07 20:54:07 +0000707 special_tasks = models.SpecialTask.objects.filter(is_active=True)
708 for execution_entry in itertools.chain(queue_entries, special_tasks):
showardd3dc1992009-04-22 21:01:40 +0000709 for pidfile_name in _ALL_PIDFILE_NAMES:
710 pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000711 execution_entry.execution_path(), pidfile_name=pidfile_name)
showardd3dc1992009-04-22 21:01:40 +0000712 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000713
714
showarded2afea2009-07-07 20:54:07 +0000715 def _get_recovery_run_monitor(self, execution_path, pidfile_name, orphans):
716 run_monitor = PidfileRunMonitor()
717 run_monitor.attach_to_existing_process(execution_path,
718 pidfile_name=pidfile_name)
719 if run_monitor.has_process():
720 orphans.discard(run_monitor.get_process())
721 return run_monitor, '(process %s)' % run_monitor.get_process()
722 return None, 'without process'
723
724
showardd3dc1992009-04-22 21:01:40 +0000725 def _recover_entries_with_status(self, status, orphans, pidfile_name,
726 recover_entries_fn):
727 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000728 for queue_entry in queue_entries:
729 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000730 # synchronous job we've already recovered
731 continue
showardd3dc1992009-04-22 21:01:40 +0000732 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showarded2afea2009-07-07 20:54:07 +0000733 run_monitor, process_string = self._get_recovery_run_monitor(
734 queue_entry.execution_path(), pidfile_name, orphans)
showard597bfd32009-05-08 18:22:50 +0000735
showarded2afea2009-07-07 20:54:07 +0000736 logging.info('Recovering %s entry %s %s',status.lower(),
737 ', '.join(str(entry) for entry in queue_entries),
738 process_string)
showardd3dc1992009-04-22 21:01:40 +0000739 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000740
741
showard6878e8b2009-07-20 22:37:45 +0000742 def _check_for_remaining_orphan_processes(self, orphans):
743 if not orphans:
744 return
745 subject = 'Unrecovered orphan autoserv processes remain'
746 message = '\n'.join(str(process) for process in orphans)
747 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000748
749 die_on_orphans = global_config.global_config.get_config_value(
750 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
751
752 if die_on_orphans:
753 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000754
showard170873e2009-01-07 00:22:26 +0000755
showardd3dc1992009-04-22 21:01:40 +0000756 def _recover_running_entries(self, orphans):
757 def recover_entries(job, queue_entries, run_monitor):
758 if run_monitor is not None:
showarded2afea2009-07-07 20:54:07 +0000759 queue_task = QueueTask(job=job, queue_entries=queue_entries,
760 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000761 self.add_agent(Agent(tasks=[queue_task],
762 num_processes=len(queue_entries)))
showard6878e8b2009-07-20 22:37:45 +0000763 else:
764 # we could do better, but this retains legacy behavior for now
765 for queue_entry in queue_entries:
766 logging.info('Requeuing running HQE %s since it has no '
767 'process' % queue_entry)
768 queue_entry.requeue()
showardd3dc1992009-04-22 21:01:40 +0000769
770 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
showarded2afea2009-07-07 20:54:07 +0000771 orphans, _AUTOSERV_PID_FILE,
showardd3dc1992009-04-22 21:01:40 +0000772 recover_entries)
773
774
775 def _recover_gathering_entries(self, orphans):
776 def recover_entries(job, queue_entries, run_monitor):
777 gather_task = GatherLogsTask(job, queue_entries,
showarded2afea2009-07-07 20:54:07 +0000778 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000779 self.add_agent(Agent([gather_task]))
780
781 self._recover_entries_with_status(
782 models.HostQueueEntry.Status.GATHERING,
783 orphans, _CRASHINFO_PID_FILE, recover_entries)
784
785
786 def _recover_parsing_entries(self, orphans):
787 def recover_entries(job, queue_entries, run_monitor):
788 reparse_task = FinalReparseTask(queue_entries,
showarded2afea2009-07-07 20:54:07 +0000789 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000790 self.add_agent(Agent([reparse_task], num_processes=0))
791
792 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
793 orphans, _PARSER_PID_FILE,
794 recover_entries)
795
796
797 def _recover_all_recoverable_entries(self):
798 orphans = _drone_manager.get_orphaned_autoserv_processes()
799 self._recover_running_entries(orphans)
800 self._recover_gathering_entries(orphans)
801 self._recover_parsing_entries(orphans)
showarded2afea2009-07-07 20:54:07 +0000802 self._recover_special_tasks(orphans)
showard6878e8b2009-07-20 22:37:45 +0000803 self._check_for_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000804
showard97aed502008-11-04 02:01:24 +0000805
showarded2afea2009-07-07 20:54:07 +0000806 def _recover_special_tasks(self, orphans):
showard2fe3f1d2009-07-06 20:19:11 +0000807 """\
808 Recovers all special tasks that have started running but have not
809 completed.
810 """
811
812 tasks = models.SpecialTask.objects.filter(is_active=True,
813 is_complete=False)
814 # Use ordering to force NULL queue_entry_id's to the end of the list
showarda5288b42009-07-28 20:06:08 +0000815 for task in tasks.order_by('-queue_entry__id'):
showarded2afea2009-07-07 20:54:07 +0000816 assert not self.host_has_agent(task.host)
showard2fe3f1d2009-07-06 20:19:11 +0000817
818 host = Host(id=task.host.id)
819 queue_entry = None
820 if task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000821 queue_entry = HostQueueEntry(id=task.queue_entry.id)
showard2fe3f1d2009-07-06 20:19:11 +0000822
showarded2afea2009-07-07 20:54:07 +0000823 run_monitor, process_string = self._get_recovery_run_monitor(
824 task.execution_path(), _AUTOSERV_PID_FILE, orphans)
825
826 logging.info('Recovering %s %s', task, process_string)
827 self._recover_special_task(task, host, queue_entry, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000828
829
showarded2afea2009-07-07 20:54:07 +0000830 def _recover_special_task(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000831 """\
832 Recovers a single special task.
833 """
834 if task.task == models.SpecialTask.Task.VERIFY:
showarded2afea2009-07-07 20:54:07 +0000835 agent_tasks = self._recover_verify(task, host, queue_entry,
836 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000837 elif task.task == models.SpecialTask.Task.REPAIR:
showarded2afea2009-07-07 20:54:07 +0000838 agent_tasks = self._recover_repair(task, host, queue_entry,
839 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000840 elif task.task == models.SpecialTask.Task.CLEANUP:
showarded2afea2009-07-07 20:54:07 +0000841 agent_tasks = self._recover_cleanup(task, host, queue_entry,
842 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000843 else:
844 # Should never happen
845 logging.error(
846 "Special task id %d had invalid task %s", (task.id, task.task))
847
848 self.add_agent(Agent(agent_tasks))
849
850
showarded2afea2009-07-07 20:54:07 +0000851 def _recover_verify(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000852 """\
853 Recovers a verify task.
854 No associated queue entry: Verify host
855 With associated queue entry: Verify host, and run associated queue
856 entry
857 """
858 if not task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000859 return [VerifyTask(host=host, task=task,
860 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000861 else:
showarded2afea2009-07-07 20:54:07 +0000862 return [VerifyTask(queue_entry=queue_entry, task=task,
863 recover_run_monitor=run_monitor),
showard2fe3f1d2009-07-06 20:19:11 +0000864 SetEntryPendingTask(queue_entry=queue_entry)]
865
866
showarded2afea2009-07-07 20:54:07 +0000867 def _recover_repair(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000868 """\
869 Recovers a repair task.
870 Always repair host
871 """
showarded2afea2009-07-07 20:54:07 +0000872 return [RepairTask(host=host, queue_entry=queue_entry, task=task,
873 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000874
875
showarded2afea2009-07-07 20:54:07 +0000876 def _recover_cleanup(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000877 """\
878 Recovers a cleanup task.
879 No associated queue entry: Clean host
880 With associated queue entry: Clean host, verify host if needed, and
881 run associated queue entry
882 """
883 if not task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000884 return [CleanupTask(host=host, task=task,
885 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000886 else:
887 agent_tasks = [CleanupTask(queue_entry=queue_entry,
showarded2afea2009-07-07 20:54:07 +0000888 task=task,
889 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000890 if queue_entry.job.should_run_verify(queue_entry):
891 agent_tasks.append(VerifyTask(queue_entry=queue_entry))
892 agent_tasks.append(
893 SetEntryPendingTask(queue_entry=queue_entry))
894 return agent_tasks
895
896
showard6af73ad2009-07-28 20:00:58 +0000897 def _requeue_starting_entries(self):
898 # temporary measure until we implement proper recovery of Starting HQEs
899 for entry in HostQueueEntry.fetch(where='status="Starting"'):
900 logging.info('Requeuing "Starting" queue entry %s' % entry)
901 assert not self.get_agents_for_entry(entry)
902 assert entry.host.status == models.Host.Status.PENDING
903 self._reverify_hosts_where('id = %s' % entry.host.id)
904 entry.requeue()
905
906
showard6878e8b2009-07-20 22:37:45 +0000907 def _check_for_remaining_active_entries(self):
showard170873e2009-01-07 00:22:26 +0000908 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000909 where='active AND NOT complete AND '
910 '(aborted OR status != "Pending")')
showardd3dc1992009-04-22 21:01:40 +0000911
showard2fe3f1d2009-07-06 20:19:11 +0000912 message = '\n'.join(str(entry) for entry in queue_entries
913 if not self.get_agents_for_entry(entry))
914 if message:
915 email_manager.manager.enqueue_notify_email(
916 'Unrecovered active host queue entries exist',
917 message)
showard170873e2009-01-07 00:22:26 +0000918
919
showard1ff7b2e2009-05-15 23:17:18 +0000920 def _find_reverify(self):
showard6d7b2ff2009-06-10 00:16:47 +0000921 tasks = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +0000922 task=models.SpecialTask.Task.VERIFY, is_active=False,
923 is_complete=False, queue_entry__isnull=True)
showard6d7b2ff2009-06-10 00:16:47 +0000924
showard2fe3f1d2009-07-06 20:19:11 +0000925 for task in tasks:
926 host = Host.fetch(where='id = %s', params=(task.host.id,)).next()
927 if host.locked or host.invalid or self.host_has_agent(host):
928 continue
showard6d7b2ff2009-06-10 00:16:47 +0000929
showard2fe3f1d2009-07-06 20:19:11 +0000930 logging.info('Force reverifying host %s', host.hostname)
931 self.add_agent(Agent([VerifyTask(host=host, task=task)]))
showard1ff7b2e2009-05-15 23:17:18 +0000932
933
showard170873e2009-01-07 00:22:26 +0000934 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000935 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000936 # should never happen
showarded2afea2009-07-07 20:54:07 +0000937 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000938 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000939 self._reverify_hosts_where(
940 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Running')",
941 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000942
943
jadmanski0afbb632008-06-06 21:10:57 +0000944 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000945 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000946 full_where='locked = 0 AND invalid = 0 AND ' + where
947 for host in Host.fetch(where=full_where):
948 if self.host_has_agent(host):
949 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000950 continue
showard170873e2009-01-07 00:22:26 +0000951 if print_message:
showardb18134f2009-03-20 20:52:18 +0000952 logging.info(print_message, host.hostname)
showard2fe3f1d2009-07-06 20:19:11 +0000953 tasks = host.reverify_tasks()
showard170873e2009-01-07 00:22:26 +0000954 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000955
956
jadmanski0afbb632008-06-06 21:10:57 +0000957 def _recover_hosts(self):
958 # recover "Repair Failed" hosts
959 message = 'Reverifying dead host %s'
960 self._reverify_hosts_where("status = 'Repair Failed'",
961 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000962
963
showard04c82c52008-05-29 19:38:12 +0000964
showardb95b1bd2008-08-15 18:11:04 +0000965 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000966 # prioritize by job priority, then non-metahost over metahost, then FIFO
967 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000968 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000969 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000970 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000971
972
showard89f84db2009-03-12 20:39:13 +0000973 def _refresh_pending_queue_entries(self):
974 """
975 Lookup the pending HostQueueEntries and call our HostScheduler
976 refresh() method given that list. Return the list.
977
978 @returns A list of pending HostQueueEntries sorted in priority order.
979 """
showard63a34772008-08-18 19:32:50 +0000980 queue_entries = self._get_pending_queue_entries()
981 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000982 return []
showardb95b1bd2008-08-15 18:11:04 +0000983
showard63a34772008-08-18 19:32:50 +0000984 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000985
showard89f84db2009-03-12 20:39:13 +0000986 return queue_entries
987
988
989 def _schedule_atomic_group(self, queue_entry):
990 """
991 Schedule the given queue_entry on an atomic group of hosts.
992
993 Returns immediately if there are insufficient available hosts.
994
995 Creates new HostQueueEntries based off of queue_entry for the
996 scheduled hosts and starts them all running.
997 """
998 # This is a virtual host queue entry representing an entire
999 # atomic group, find a group and schedule their hosts.
1000 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1001 queue_entry)
1002 if not group_hosts:
1003 return
showardcbe6f942009-06-17 19:33:49 +00001004
1005 logging.info('Expanding atomic group entry %s with hosts %s',
1006 queue_entry,
1007 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +00001008 # The first assigned host uses the original HostQueueEntry
1009 group_queue_entries = [queue_entry]
1010 for assigned_host in group_hosts[1:]:
1011 # Create a new HQE for every additional assigned_host.
1012 new_hqe = HostQueueEntry.clone(queue_entry)
1013 new_hqe.save()
1014 group_queue_entries.append(new_hqe)
1015 assert len(group_queue_entries) == len(group_hosts)
1016 for queue_entry, host in itertools.izip(group_queue_entries,
1017 group_hosts):
1018 self._run_queue_entry(queue_entry, host)
1019
1020
1021 def _schedule_new_jobs(self):
1022 queue_entries = self._refresh_pending_queue_entries()
1023 if not queue_entries:
1024 return
1025
showard63a34772008-08-18 19:32:50 +00001026 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +00001027 if (queue_entry.atomic_group_id is None or
1028 queue_entry.host_id is not None):
1029 assigned_host = self._host_scheduler.find_eligible_host(
1030 queue_entry)
1031 if assigned_host:
1032 self._run_queue_entry(queue_entry, assigned_host)
1033 else:
1034 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001035
1036
1037 def _run_queue_entry(self, queue_entry, host):
showard77182562009-06-10 00:16:05 +00001038 agent = queue_entry.run_pre_job_tasks(assigned_host=host)
1039 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +00001040
1041
jadmanski0afbb632008-06-06 21:10:57 +00001042 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001043 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001044 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001045 for agent in self.get_agents_for_entry(entry):
1046 agent.abort()
1047 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001048
1049
showard324bf812009-01-20 23:23:38 +00001050 def _can_start_agent(self, agent, num_started_this_cycle,
1051 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001052 # always allow zero-process agents to run
1053 if agent.num_processes == 0:
1054 return True
1055 # don't allow any nonzero-process agents to run after we've reached a
1056 # limit (this avoids starvation of many-process agents)
1057 if have_reached_limit:
1058 return False
1059 # total process throttling
showard324bf812009-01-20 23:23:38 +00001060 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +00001061 return False
1062 # if a single agent exceeds the per-cycle throttling, still allow it to
1063 # run when it's the first agent in the cycle
1064 if num_started_this_cycle == 0:
1065 return True
1066 # per-cycle throttling
1067 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +00001068 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001069 return False
1070 return True
1071
1072
jadmanski0afbb632008-06-06 21:10:57 +00001073 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001074 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001075 have_reached_limit = False
1076 # iterate over copy, so we can remove agents during iteration
1077 for agent in list(self._agents):
1078 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +00001079 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +00001080 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +00001081 continue
1082 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +00001083 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001084 have_reached_limit):
1085 have_reached_limit = True
1086 continue
showard4c5374f2008-09-04 17:02:56 +00001087 num_started_this_cycle += agent.num_processes
1088 agent.tick()
showarda9435c02009-05-13 21:28:17 +00001089 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001090 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001091
1092
showard29f7cd22009-04-29 21:16:24 +00001093 def _process_recurring_runs(self):
1094 recurring_runs = models.RecurringRun.objects.filter(
1095 start_date__lte=datetime.datetime.now())
1096 for rrun in recurring_runs:
1097 # Create job from template
1098 job = rrun.job
1099 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001100 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001101
1102 host_objects = info['hosts']
1103 one_time_hosts = info['one_time_hosts']
1104 metahost_objects = info['meta_hosts']
1105 dependencies = info['dependencies']
1106 atomic_group = info['atomic_group']
1107
1108 for host in one_time_hosts or []:
1109 this_host = models.Host.create_one_time_host(host.hostname)
1110 host_objects.append(this_host)
1111
1112 try:
1113 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001114 options=options,
showard29f7cd22009-04-29 21:16:24 +00001115 host_objects=host_objects,
1116 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001117 atomic_group=atomic_group)
1118
1119 except Exception, ex:
1120 logging.exception(ex)
1121 #TODO send email
1122
1123 if rrun.loop_count == 1:
1124 rrun.delete()
1125 else:
1126 if rrun.loop_count != 0: # if not infinite loop
1127 # calculate new start_date
1128 difference = datetime.timedelta(seconds=rrun.loop_period)
1129 rrun.start_date = rrun.start_date + difference
1130 rrun.loop_count -= 1
1131 rrun.save()
1132
1133
showard170873e2009-01-07 00:22:26 +00001134class PidfileRunMonitor(object):
1135 """
1136 Client must call either run() to start a new process or
1137 attach_to_existing_process().
1138 """
mbligh36768f02008-02-22 18:28:33 +00001139
showard170873e2009-01-07 00:22:26 +00001140 class _PidfileException(Exception):
1141 """
1142 Raised when there's some unexpected behavior with the pid file, but only
1143 used internally (never allowed to escape this class).
1144 """
mbligh36768f02008-02-22 18:28:33 +00001145
1146
showard170873e2009-01-07 00:22:26 +00001147 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001148 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001149 self._start_time = None
1150 self.pidfile_id = None
1151 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001152
1153
showard170873e2009-01-07 00:22:26 +00001154 def _add_nice_command(self, command, nice_level):
1155 if not nice_level:
1156 return command
1157 return ['nice', '-n', str(nice_level)] + command
1158
1159
1160 def _set_start_time(self):
1161 self._start_time = time.time()
1162
1163
1164 def run(self, command, working_directory, nice_level=None, log_file=None,
1165 pidfile_name=None, paired_with_pidfile=None):
1166 assert command is not None
1167 if nice_level is not None:
1168 command = ['nice', '-n', str(nice_level)] + command
1169 self._set_start_time()
1170 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001171 command, working_directory, pidfile_name=pidfile_name,
1172 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001173
1174
showarded2afea2009-07-07 20:54:07 +00001175 def attach_to_existing_process(self, execution_path,
showardd3dc1992009-04-22 21:01:40 +00001176 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001177 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001178 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001179 execution_path, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001180 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001181
1182
jadmanski0afbb632008-06-06 21:10:57 +00001183 def kill(self):
showard170873e2009-01-07 00:22:26 +00001184 if self.has_process():
1185 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001186
mbligh36768f02008-02-22 18:28:33 +00001187
showard170873e2009-01-07 00:22:26 +00001188 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001189 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001190 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001191
1192
showard170873e2009-01-07 00:22:26 +00001193 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001194 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001195 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001196 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001197
1198
showard170873e2009-01-07 00:22:26 +00001199 def _read_pidfile(self, use_second_read=False):
1200 assert self.pidfile_id is not None, (
1201 'You must call run() or attach_to_existing_process()')
1202 contents = _drone_manager.get_pidfile_contents(
1203 self.pidfile_id, use_second_read=use_second_read)
1204 if contents.is_invalid():
1205 self._state = drone_manager.PidfileContents()
1206 raise self._PidfileException(contents)
1207 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001208
1209
showard21baa452008-10-21 00:08:39 +00001210 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001211 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1212 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001213 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001214 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001215
1216
1217 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001218 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001219 return
mblighbb421852008-03-11 22:36:16 +00001220
showard21baa452008-10-21 00:08:39 +00001221 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001222
showard170873e2009-01-07 00:22:26 +00001223 if self._state.process is None:
1224 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001225 return
mbligh90a549d2008-03-25 23:52:34 +00001226
showard21baa452008-10-21 00:08:39 +00001227 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001228 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001229 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001230 return
mbligh90a549d2008-03-25 23:52:34 +00001231
showard170873e2009-01-07 00:22:26 +00001232 # pid but no running process - maybe process *just* exited
1233 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001234 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001235 # autoserv exited without writing an exit code
1236 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001237 self._handle_pidfile_error(
1238 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001239
showard21baa452008-10-21 00:08:39 +00001240
1241 def _get_pidfile_info(self):
1242 """\
1243 After completion, self._state will contain:
1244 pid=None, exit_status=None if autoserv has not yet run
1245 pid!=None, exit_status=None if autoserv is running
1246 pid!=None, exit_status!=None if autoserv has completed
1247 """
1248 try:
1249 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001250 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001251 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001252
1253
showard170873e2009-01-07 00:22:26 +00001254 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001255 """\
1256 Called when no pidfile is found or no pid is in the pidfile.
1257 """
showard170873e2009-01-07 00:22:26 +00001258 message = 'No pid found at %s' % self.pidfile_id
showard170873e2009-01-07 00:22:26 +00001259 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1260 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001261 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001262 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001263
1264
showard35162b02009-03-03 02:17:30 +00001265 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001266 """\
1267 Called when autoserv has exited without writing an exit status,
1268 or we've timed out waiting for autoserv to write a pid to the
1269 pidfile. In either case, we just return failure and the caller
1270 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001271
showard170873e2009-01-07 00:22:26 +00001272 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001273 """
1274 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001275 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001276 self._state.exit_status = 1
1277 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001278
1279
jadmanski0afbb632008-06-06 21:10:57 +00001280 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001281 self._get_pidfile_info()
1282 return self._state.exit_status
1283
1284
1285 def num_tests_failed(self):
1286 self._get_pidfile_info()
1287 assert self._state.num_tests_failed is not None
1288 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001289
1290
mbligh36768f02008-02-22 18:28:33 +00001291class Agent(object):
showard77182562009-06-10 00:16:05 +00001292 """
1293 An agent for use by the Dispatcher class to perform a sequence of tasks.
1294
1295 The following methods are required on all task objects:
1296 poll() - Called periodically to let the task check its status and
1297 update its internal state. If the task succeeded.
1298 is_done() - Returns True if the task is finished.
1299 abort() - Called when an abort has been requested. The task must
1300 set its aborted attribute to True if it actually aborted.
1301
1302 The following attributes are required on all task objects:
1303 aborted - bool, True if this task was aborted.
1304 failure_tasks - A sequence of tasks to be run using a new Agent
1305 by the dispatcher should this task fail.
1306 success - bool, True if this task succeeded.
1307 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1308 host_ids - A sequence of Host ids this task represents.
1309
1310 The following attribute is written to all task objects:
1311 agent - A reference to the Agent instance that the task has been
1312 added to.
1313 """
1314
1315
showard170873e2009-01-07 00:22:26 +00001316 def __init__(self, tasks, num_processes=1):
showard77182562009-06-10 00:16:05 +00001317 """
1318 @param tasks: A list of tasks as described in the class docstring.
1319 @param num_processes: The number of subprocesses the Agent represents.
1320 This is used by the Dispatcher for managing the load on the
1321 system. Defaults to 1.
1322 """
jadmanski0afbb632008-06-06 21:10:57 +00001323 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001324 self.queue = None
showard77182562009-06-10 00:16:05 +00001325 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001326 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001327 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001328
showard170873e2009-01-07 00:22:26 +00001329 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1330 for task in tasks)
1331 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1332
showardd3dc1992009-04-22 21:01:40 +00001333 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001334 for task in tasks:
1335 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001336
1337
showardd3dc1992009-04-22 21:01:40 +00001338 def _clear_queue(self):
1339 self.queue = Queue.Queue(0)
1340
1341
showard170873e2009-01-07 00:22:26 +00001342 def _union_ids(self, id_lists):
1343 return set(itertools.chain(*id_lists))
1344
1345
jadmanski0afbb632008-06-06 21:10:57 +00001346 def add_task(self, task):
1347 self.queue.put_nowait(task)
1348 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001349
1350
jadmanski0afbb632008-06-06 21:10:57 +00001351 def tick(self):
showard21baa452008-10-21 00:08:39 +00001352 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001353 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001354 self.active_task.poll()
1355 if not self.active_task.is_done():
1356 return
1357 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001358
1359
jadmanski0afbb632008-06-06 21:10:57 +00001360 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001361 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001362 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001363 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001364 if not self.active_task.success:
1365 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001366 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001367
jadmanski0afbb632008-06-06 21:10:57 +00001368 if not self.is_done():
1369 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001370
1371
jadmanski0afbb632008-06-06 21:10:57 +00001372 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001373 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001374 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1375 # get reset.
1376 new_agent = Agent(self.active_task.failure_tasks)
1377 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001378
mblighe2586682008-02-29 22:45:46 +00001379
showard4c5374f2008-09-04 17:02:56 +00001380 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001381 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001382
1383
jadmanski0afbb632008-06-06 21:10:57 +00001384 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001385 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001386
1387
showardd3dc1992009-04-22 21:01:40 +00001388 def abort(self):
showard08a36412009-05-05 01:01:13 +00001389 # abort tasks until the queue is empty or a task ignores the abort
1390 while not self.is_done():
1391 if not self.active_task:
1392 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001393 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001394 if not self.active_task.aborted:
1395 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001396 return
1397 self.active_task = None
1398
showardd3dc1992009-04-22 21:01:40 +00001399
showard77182562009-06-10 00:16:05 +00001400class DelayedCallTask(object):
1401 """
1402 A task object like AgentTask for an Agent to run that waits for the
1403 specified amount of time to have elapsed before calling the supplied
1404 callback once and finishing. If the callback returns anything, it is
1405 assumed to be a new Agent instance and will be added to the dispatcher.
1406
1407 @attribute end_time: The absolute posix time after which this task will
1408 call its callback when it is polled and be finished.
1409
1410 Also has all attributes required by the Agent class.
1411 """
1412 def __init__(self, delay_seconds, callback, now_func=None):
1413 """
1414 @param delay_seconds: The delay in seconds from now that this task
1415 will call the supplied callback and be done.
1416 @param callback: A callable to be called by this task once after at
1417 least delay_seconds time has elapsed. It must return None
1418 or a new Agent instance.
1419 @param now_func: A time.time like function. Default: time.time.
1420 Used for testing.
1421 """
1422 assert delay_seconds > 0
1423 assert callable(callback)
1424 if not now_func:
1425 now_func = time.time
1426 self._now_func = now_func
1427 self._callback = callback
1428
1429 self.end_time = self._now_func() + delay_seconds
1430
1431 # These attributes are required by Agent.
1432 self.aborted = False
1433 self.failure_tasks = ()
1434 self.host_ids = ()
1435 self.success = False
1436 self.queue_entry_ids = ()
1437 # This is filled in by Agent.add_task().
1438 self.agent = None
1439
1440
1441 def poll(self):
1442 if self._callback and self._now_func() >= self.end_time:
1443 new_agent = self._callback()
1444 if new_agent:
1445 self.agent.dispatcher.add_agent(new_agent)
1446 self._callback = None
1447 self.success = True
1448
1449
1450 def is_done(self):
1451 return not self._callback
1452
1453
1454 def abort(self):
1455 self.aborted = True
1456 self._callback = None
1457
1458
mbligh36768f02008-02-22 18:28:33 +00001459class AgentTask(object):
showarded2afea2009-07-07 20:54:07 +00001460 def __init__(self, cmd=None, working_directory=None, failure_tasks=[],
1461 pidfile_name=None, paired_with_pidfile=None,
1462 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001463 self.done = False
1464 self.failure_tasks = failure_tasks
jadmanski0afbb632008-06-06 21:10:57 +00001465 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001466 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001467 self.agent = None
showarded2afea2009-07-07 20:54:07 +00001468 self.monitor = recover_run_monitor
1469 self.started = bool(recover_run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +00001470 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001471 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001472 self.queue_entry_ids = []
1473 self.host_ids = []
1474 self.log_file = None
1475
1476
1477 def _set_ids(self, host=None, queue_entries=None):
1478 if queue_entries and queue_entries != [None]:
1479 self.host_ids = [entry.host.id for entry in queue_entries]
1480 self.queue_entry_ids = [entry.id for entry in queue_entries]
1481 else:
1482 assert host
1483 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001484
1485
jadmanski0afbb632008-06-06 21:10:57 +00001486 def poll(self):
showard08a36412009-05-05 01:01:13 +00001487 if not self.started:
1488 self.start()
1489 self.tick()
1490
1491
1492 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001493 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001494 exit_code = self.monitor.exit_code()
1495 if exit_code is None:
1496 return
1497 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001498 else:
1499 success = False
mbligh36768f02008-02-22 18:28:33 +00001500
jadmanski0afbb632008-06-06 21:10:57 +00001501 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001502
1503
jadmanski0afbb632008-06-06 21:10:57 +00001504 def is_done(self):
1505 return self.done
mbligh36768f02008-02-22 18:28:33 +00001506
1507
jadmanski0afbb632008-06-06 21:10:57 +00001508 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001509 if self.done:
1510 return
jadmanski0afbb632008-06-06 21:10:57 +00001511 self.done = True
1512 self.success = success
1513 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001514
1515
jadmanski0afbb632008-06-06 21:10:57 +00001516 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001517 assert not self.monitor
mblighd64e5702008-04-04 21:39:28 +00001518
mbligh36768f02008-02-22 18:28:33 +00001519
jadmanski0afbb632008-06-06 21:10:57 +00001520 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001521 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001522 _drone_manager.copy_to_results_repository(
1523 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001524
1525
jadmanski0afbb632008-06-06 21:10:57 +00001526 def epilog(self):
1527 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001528
1529
jadmanski0afbb632008-06-06 21:10:57 +00001530 def start(self):
1531 assert self.agent
1532
1533 if not self.started:
1534 self.prolog()
1535 self.run()
1536
1537 self.started = True
1538
1539
1540 def abort(self):
1541 if self.monitor:
1542 self.monitor.kill()
1543 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001544 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001545 self.cleanup()
1546
1547
showarded2afea2009-07-07 20:54:07 +00001548 def _get_consistent_execution_path(self, execution_entries):
1549 first_execution_path = execution_entries[0].execution_path()
1550 for execution_entry in execution_entries[1:]:
1551 assert execution_entry.execution_path() == first_execution_path, (
1552 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1553 execution_entry,
1554 first_execution_path,
1555 execution_entries[0]))
1556 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001557
1558
showarded2afea2009-07-07 20:54:07 +00001559 def _copy_results(self, execution_entries, use_monitor=None):
1560 """
1561 @param execution_entries: list of objects with execution_path() method
1562 """
1563 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001564 if use_monitor is None:
1565 assert self.monitor
1566 use_monitor = self.monitor
1567 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001568 execution_path = self._get_consistent_execution_path(execution_entries)
1569 results_path = execution_path + '/'
showard6b733412009-04-27 20:09:18 +00001570 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001571 results_path)
showardde634ee2009-01-30 01:44:24 +00001572
showarda1e74b32009-05-12 17:32:04 +00001573
1574 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001575 reparse_task = FinalReparseTask(queue_entries)
1576 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1577
1578
showarda1e74b32009-05-12 17:32:04 +00001579 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1580 self._copy_results(queue_entries, use_monitor)
1581 self._parse_results(queue_entries)
1582
1583
showardd3dc1992009-04-22 21:01:40 +00001584 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
showarded2afea2009-07-07 20:54:07 +00001585 assert not self.monitor
jadmanski0afbb632008-06-06 21:10:57 +00001586 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001587 self.monitor = PidfileRunMonitor()
1588 self.monitor.run(self.cmd, self._working_directory,
1589 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001590 log_file=self.log_file,
1591 pidfile_name=pidfile_name,
1592 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001593
1594
showardd9205182009-04-27 20:09:55 +00001595class TaskWithJobKeyvals(object):
1596 """AgentTask mixin providing functionality to help with job keyval files."""
1597 _KEYVAL_FILE = 'keyval'
1598 def _format_keyval(self, key, value):
1599 return '%s=%s' % (key, value)
1600
1601
1602 def _keyval_path(self):
1603 """Subclasses must override this"""
1604 raise NotImplemented
1605
1606
1607 def _write_keyval_after_job(self, field, value):
1608 assert self.monitor
1609 if not self.monitor.has_process():
1610 return
1611 _drone_manager.write_lines_to_file(
1612 self._keyval_path(), [self._format_keyval(field, value)],
1613 paired_with_process=self.monitor.get_process())
1614
1615
1616 def _job_queued_keyval(self, job):
1617 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1618
1619
1620 def _write_job_finished(self):
1621 self._write_keyval_after_job("job_finished", int(time.time()))
1622
1623
showarded2afea2009-07-07 20:54:07 +00001624class SpecialAgentTask(AgentTask):
1625 """
1626 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1627 """
1628
1629 TASK_TYPE = None
1630 host = None
1631 queue_entry = None
1632
1633 def __init__(self, task, extra_command_args, **kwargs):
1634 assert self.host
1635 assert (self.TASK_TYPE is not None,
1636 'self.TASK_TYPE must be overridden')
1637 self.task = task
showard8ac6f2a2009-07-16 14:50:32 +00001638 if task:
1639 kwargs['working_directory'] = task.execution_path()
showarded2afea2009-07-07 20:54:07 +00001640 self._extra_command_args = extra_command_args
1641 super(SpecialAgentTask, self).__init__(**kwargs)
1642
1643
1644 def prolog(self):
1645 super(SpecialAgentTask, self).prolog()
1646 self.task = models.SpecialTask.prepare(self, self.task)
1647 self.cmd = _autoserv_command_line(self.host.hostname,
1648 self._extra_command_args,
1649 queue_entry=self.queue_entry)
1650 self._working_directory = self.task.execution_path()
1651 self.task.activate()
1652
1653
showardb6681aa2009-07-08 21:15:00 +00001654 def cleanup(self):
1655 super(SpecialAgentTask, self).cleanup()
showarda640b2d2009-07-20 22:37:06 +00001656
1657 # self.task can be None if a SpecialAgentTask is aborted before the
1658 # prolog runs
1659 if self.task:
1660 self.task.finish()
1661
1662 if self.monitor and self.monitor.has_process() and self.task:
showarded2afea2009-07-07 20:54:07 +00001663 self._copy_results([self.task])
1664
1665
1666class RepairTask(SpecialAgentTask, TaskWithJobKeyvals):
1667 TASK_TYPE = models.SpecialTask.Task.REPAIR
1668
1669
1670 def __init__(self, host, queue_entry=None, task=None,
1671 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001672 """\
showard170873e2009-01-07 00:22:26 +00001673 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001674 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001675 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001676 # normalize the protection name
1677 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001678
jadmanski0afbb632008-06-06 21:10:57 +00001679 self.host = host
showardcfd4a7e2009-07-11 01:47:33 +00001680 self.queue_entry = None
1681 # recovery code can pass a HQE that's already been requeued. for a
1682 # metahost, that means the host has been unassigned. in that case,
1683 # ignore the HQE.
1684 hqe_still_assigned_to_this_host = (queue_entry and queue_entry.host
1685 and queue_entry.host.id == host.id)
1686 if hqe_still_assigned_to_this_host:
1687 self.queue_entry = queue_entry
showard170873e2009-01-07 00:22:26 +00001688
showarded2afea2009-07-07 20:54:07 +00001689 super(RepairTask, self).__init__(
1690 task, ['-R', '--host-protection', protection],
1691 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001692
showard2fe3f1d2009-07-06 20:19:11 +00001693 # *don't* include the queue entry in IDs -- if the queue entry is
1694 # aborted, we want to leave the repair task running
1695 self._set_ids(host=host)
1696
mbligh36768f02008-02-22 18:28:33 +00001697
jadmanski0afbb632008-06-06 21:10:57 +00001698 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001699 super(RepairTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001700 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001701 self.host.set_status('Repairing')
showard2fe3f1d2009-07-06 20:19:11 +00001702 if self.queue_entry:
1703 self.queue_entry.requeue()
1704
mbligh36768f02008-02-22 18:28:33 +00001705
showardd9205182009-04-27 20:09:55 +00001706 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001707 return os.path.join(self._working_directory, self._KEYVAL_FILE)
showardd9205182009-04-27 20:09:55 +00001708
1709
showardde634ee2009-01-30 01:44:24 +00001710 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001711 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001712
showard2fe3f1d2009-07-06 20:19:11 +00001713 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001714 return # don't fail metahost entries, they'll be reassigned
1715
showard2fe3f1d2009-07-06 20:19:11 +00001716 self.queue_entry.update_from_database()
1717 if self.queue_entry.status != 'Queued':
showardccbd6c52009-03-21 00:10:21 +00001718 return # entry has been aborted
1719
showard2fe3f1d2009-07-06 20:19:11 +00001720 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001721 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001722 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001723 self._write_keyval_after_job(queued_key, queued_time)
1724 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001725 # copy results logs into the normal place for job results
1726 _drone_manager.copy_results_on_drone(
1727 self.monitor.get_process(),
showarded2afea2009-07-07 20:54:07 +00001728 source_path=self._working_directory + '/',
1729 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001730
showard2fe3f1d2009-07-06 20:19:11 +00001731 self._copy_results([self.queue_entry])
1732 if self.queue_entry.job.parse_failed_repair:
1733 self._parse_results([self.queue_entry])
1734 self.queue_entry.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001735
1736
jadmanski0afbb632008-06-06 21:10:57 +00001737 def epilog(self):
1738 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001739
jadmanski0afbb632008-06-06 21:10:57 +00001740 if self.success:
1741 self.host.set_status('Ready')
1742 else:
1743 self.host.set_status('Repair Failed')
showard2fe3f1d2009-07-06 20:19:11 +00001744 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001745 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001746
1747
showarded2afea2009-07-07 20:54:07 +00001748class PreJobTask(SpecialAgentTask):
showard170873e2009-01-07 00:22:26 +00001749 def epilog(self):
1750 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001751 should_copy_results = (self.queue_entry and not self.success
1752 and not self.queue_entry.meta_host)
1753 if should_copy_results:
1754 self.queue_entry.set_execution_subdir()
showarded2afea2009-07-07 20:54:07 +00001755 log_name = os.path.basename(self.task.execution_path())
1756 source = os.path.join(self.task.execution_path(), 'debug',
1757 'autoserv.DEBUG')
1758 destination = os.path.join(self.queue_entry.execution_path(),
1759 log_name)
showard170873e2009-01-07 00:22:26 +00001760 _drone_manager.copy_to_results_repository(
showarded2afea2009-07-07 20:54:07 +00001761 self.monitor.get_process(), source,
showard170873e2009-01-07 00:22:26 +00001762 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001763
1764
1765class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001766 TASK_TYPE = models.SpecialTask.Task.VERIFY
1767
1768
1769 def __init__(self, queue_entry=None, host=None, task=None,
1770 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001771 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001772 self.host = host or queue_entry.host
1773 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001774
showarde788ea62008-11-17 21:02:47 +00001775 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showarded2afea2009-07-07 20:54:07 +00001776 super(VerifyTask, self).__init__(
1777 task, ['-v'], failure_tasks=failure_tasks,
1778 recover_run_monitor=recover_run_monitor)
mblighe2586682008-02-29 22:45:46 +00001779
showard170873e2009-01-07 00:22:26 +00001780 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001781
1782
jadmanski0afbb632008-06-06 21:10:57 +00001783 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001784 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001785
showardb18134f2009-03-20 20:52:18 +00001786 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001787 if self.queue_entry:
1788 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001789 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001790
showarded2afea2009-07-07 20:54:07 +00001791 # Delete any other queued verifies for this host. One verify will do
1792 # and there's no need to keep records of other requests.
1793 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001794 host__id=self.host.id,
1795 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001796 is_active=False, is_complete=False)
1797 queued_verifies = queued_verifies.exclude(id=self.task.id)
1798 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001799
mbligh36768f02008-02-22 18:28:33 +00001800
jadmanski0afbb632008-06-06 21:10:57 +00001801 def epilog(self):
1802 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001803 if self.success:
jadmanski0afbb632008-06-06 21:10:57 +00001804 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001805
1806
showardb5626452009-06-30 01:57:28 +00001807class CleanupHostsMixin(object):
1808 def _reboot_hosts(self, job, queue_entries, final_success,
1809 num_tests_failed):
1810 reboot_after = job.reboot_after
1811 do_reboot = (
1812 # always reboot after aborted jobs
1813 self._final_status == models.HostQueueEntry.Status.ABORTED
1814 or reboot_after == models.RebootAfter.ALWAYS
1815 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
1816 and final_success and num_tests_failed == 0))
1817
1818 for queue_entry in queue_entries:
1819 if do_reboot:
1820 # don't pass the queue entry to the CleanupTask. if the cleanup
1821 # fails, the job doesn't care -- it's over.
1822 cleanup_task = CleanupTask(host=queue_entry.host)
1823 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1824 else:
1825 queue_entry.host.set_status('Ready')
1826
1827
1828class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
showarded2afea2009-07-07 20:54:07 +00001829 def __init__(self, job, queue_entries, cmd=None, group_name='',
1830 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001831 self.job = job
1832 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001833 self.group_name = group_name
showarded2afea2009-07-07 20:54:07 +00001834 super(QueueTask, self).__init__(
1835 cmd, self._execution_path(),
1836 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001837 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001838
1839
showard73ec0442009-02-07 02:05:20 +00001840 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001841 return os.path.join(self._execution_path(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001842
1843
1844 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1845 keyval_contents = '\n'.join(self._format_keyval(key, value)
1846 for key, value in keyval_dict.iteritems())
1847 # always end with a newline to allow additional keyvals to be written
1848 keyval_contents += '\n'
showarded2afea2009-07-07 20:54:07 +00001849 _drone_manager.attach_file_to_execution(self._execution_path(),
showard73ec0442009-02-07 02:05:20 +00001850 keyval_contents,
1851 file_path=keyval_path)
1852
1853
1854 def _write_keyvals_before_job(self, keyval_dict):
1855 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1856
1857
showard170873e2009-01-07 00:22:26 +00001858 def _write_host_keyvals(self, host):
showarded2afea2009-07-07 20:54:07 +00001859 keyval_path = os.path.join(self._execution_path(), 'host_keyvals',
showard170873e2009-01-07 00:22:26 +00001860 host.hostname)
1861 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001862 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1863 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001864
1865
showarded2afea2009-07-07 20:54:07 +00001866 def _execution_path(self):
1867 return self.queue_entries[0].execution_path()
mblighbb421852008-03-11 22:36:16 +00001868
1869
jadmanski0afbb632008-06-06 21:10:57 +00001870 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001871 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001872 keyval_dict = {queued_key: queued_time}
1873 if self.group_name:
1874 keyval_dict['host_group_name'] = self.group_name
1875 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001876 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001877 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001878 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001879 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001880 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001881 queue_entry.host.update_field('dirty', 1)
showardc6a56872009-07-28 20:11:58 +00001882 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1883 # TODO(gps): Remove this if nothing needs it anymore.
1884 # A potential user is: tko/parser
jadmanski0afbb632008-06-06 21:10:57 +00001885 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001886
1887
showard35162b02009-03-03 02:17:30 +00001888 def _write_lost_process_error_file(self):
showarded2afea2009-07-07 20:54:07 +00001889 error_file_path = os.path.join(self._execution_path(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001890 _drone_manager.write_lines_to_file(error_file_path,
1891 [_LOST_PROCESS_ERROR])
1892
1893
showardd3dc1992009-04-22 21:01:40 +00001894 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001895 if not self.monitor:
1896 return
1897
showardd9205182009-04-27 20:09:55 +00001898 self._write_job_finished()
1899
showardd3dc1992009-04-22 21:01:40 +00001900 # both of these conditionals can be true, iff the process ran, wrote a
1901 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001902 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001903 gather_task = GatherLogsTask(self.job, self.queue_entries)
1904 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showardb5626452009-06-30 01:57:28 +00001905 else:
1906 self._reboot_hosts(self.job, self.queue_entries,
1907 final_success=False, num_tests_failed=0)
showard35162b02009-03-03 02:17:30 +00001908
1909 if self.monitor.lost_process:
1910 self._write_lost_process_error_file()
1911 for queue_entry in self.queue_entries:
1912 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001913
1914
showardcbd74612008-11-19 21:42:02 +00001915 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001916 _drone_manager.write_lines_to_file(
showarded2afea2009-07-07 20:54:07 +00001917 os.path.join(self._execution_path(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001918 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001919 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001920
1921
jadmanskif7fa2cc2008-10-01 14:13:23 +00001922 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001923 if not self.monitor or not self.monitor.has_process():
1924 return
1925
jadmanskif7fa2cc2008-10-01 14:13:23 +00001926 # build up sets of all the aborted_by and aborted_on values
1927 aborted_by, aborted_on = set(), set()
1928 for queue_entry in self.queue_entries:
1929 if queue_entry.aborted_by:
1930 aborted_by.add(queue_entry.aborted_by)
1931 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1932 aborted_on.add(t)
1933
1934 # extract some actual, unique aborted by value and write it out
1935 assert len(aborted_by) <= 1
1936 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001937 aborted_by_value = aborted_by.pop()
1938 aborted_on_value = max(aborted_on)
1939 else:
1940 aborted_by_value = 'autotest_system'
1941 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001942
showarda0382352009-02-11 23:36:43 +00001943 self._write_keyval_after_job("aborted_by", aborted_by_value)
1944 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001945
showardcbd74612008-11-19 21:42:02 +00001946 aborted_on_string = str(datetime.datetime.fromtimestamp(
1947 aborted_on_value))
1948 self._write_status_comment('Job aborted by %s on %s' %
1949 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001950
1951
jadmanski0afbb632008-06-06 21:10:57 +00001952 def abort(self):
1953 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001954 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001955 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001956
1957
jadmanski0afbb632008-06-06 21:10:57 +00001958 def epilog(self):
1959 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001960 self._finish_task()
1961 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001962
1963
showardd3dc1992009-04-22 21:01:40 +00001964class PostJobTask(AgentTask):
1965 def __init__(self, queue_entries, pidfile_name, logfile_name,
showarded2afea2009-07-07 20:54:07 +00001966 recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00001967 self._queue_entries = queue_entries
1968 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00001969
showarded2afea2009-07-07 20:54:07 +00001970 self._execution_path = self._get_consistent_execution_path(
1971 queue_entries)
1972 self._results_dir = _drone_manager.absolute_path(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00001973 self._autoserv_monitor = PidfileRunMonitor()
showarded2afea2009-07-07 20:54:07 +00001974 self._autoserv_monitor.attach_to_existing_process(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00001975 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1976
1977 if _testing_mode:
1978 command = 'true'
1979 else:
1980 command = self._generate_command(self._results_dir)
1981
showarded2afea2009-07-07 20:54:07 +00001982 super(PostJobTask, self).__init__(
1983 cmd=command, working_directory=self._execution_path,
1984 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00001985
showarded2afea2009-07-07 20:54:07 +00001986 self.log_file = os.path.join(self._execution_path, logfile_name)
showardd3dc1992009-04-22 21:01:40 +00001987 self._final_status = self._determine_final_status()
1988
1989
1990 def _generate_command(self, results_dir):
1991 raise NotImplementedError('Subclasses must override this')
1992
1993
1994 def _job_was_aborted(self):
1995 was_aborted = None
1996 for queue_entry in self._queue_entries:
1997 queue_entry.update_from_database()
1998 if was_aborted is None: # first queue entry
1999 was_aborted = bool(queue_entry.aborted)
2000 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2001 email_manager.manager.enqueue_notify_email(
2002 'Inconsistent abort state',
2003 'Queue entries have inconsistent abort state: ' +
2004 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2005 # don't crash here, just assume true
2006 return True
2007 return was_aborted
2008
2009
2010 def _determine_final_status(self):
2011 if self._job_was_aborted():
2012 return models.HostQueueEntry.Status.ABORTED
2013
2014 # we'll use a PidfileRunMonitor to read the autoserv exit status
2015 if self._autoserv_monitor.exit_code() == 0:
2016 return models.HostQueueEntry.Status.COMPLETED
2017 return models.HostQueueEntry.Status.FAILED
2018
2019
2020 def run(self):
showard5add1c82009-05-26 19:27:46 +00002021 # make sure we actually have results to work with.
2022 # this should never happen in normal operation.
2023 if not self._autoserv_monitor.has_process():
2024 email_manager.manager.enqueue_notify_email(
2025 'No results in post-job task',
2026 'No results in post-job task at %s' %
2027 self._autoserv_monitor.pidfile_id)
2028 self.finished(False)
2029 return
2030
2031 super(PostJobTask, self).run(
2032 pidfile_name=self._pidfile_name,
2033 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00002034
2035
2036 def _set_all_statuses(self, status):
2037 for queue_entry in self._queue_entries:
2038 queue_entry.set_status(status)
2039
2040
2041 def abort(self):
2042 # override AgentTask.abort() to avoid killing the process and ending
2043 # the task. post-job tasks continue when the job is aborted.
2044 pass
2045
2046
showardb5626452009-06-30 01:57:28 +00002047class GatherLogsTask(PostJobTask, CleanupHostsMixin):
showardd3dc1992009-04-22 21:01:40 +00002048 """
2049 Task responsible for
2050 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2051 * copying logs to the results repository
2052 * spawning CleanupTasks for hosts, if necessary
2053 * spawning a FinalReparseTask for the job
2054 """
showarded2afea2009-07-07 20:54:07 +00002055 def __init__(self, job, queue_entries, recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002056 self._job = job
2057 super(GatherLogsTask, self).__init__(
2058 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
showarded2afea2009-07-07 20:54:07 +00002059 logfile_name='.collect_crashinfo.log',
2060 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002061 self._set_ids(queue_entries=queue_entries)
2062
2063
2064 def _generate_command(self, results_dir):
2065 host_list = ','.join(queue_entry.host.hostname
2066 for queue_entry in self._queue_entries)
2067 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2068 '-r', results_dir]
2069
2070
2071 def prolog(self):
2072 super(GatherLogsTask, self).prolog()
2073 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
2074
2075
showardd3dc1992009-04-22 21:01:40 +00002076 def epilog(self):
2077 super(GatherLogsTask, self).epilog()
showardebc0fb72009-05-13 21:28:07 +00002078 if self._autoserv_monitor.has_process():
2079 self._copy_and_parse_results(self._queue_entries,
2080 use_monitor=self._autoserv_monitor)
showardb5626452009-06-30 01:57:28 +00002081
2082 final_success = (
2083 self._final_status == models.HostQueueEntry.Status.COMPLETED)
2084 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2085 self._reboot_hosts(self._job, self._queue_entries, final_success,
2086 num_tests_failed)
showardd3dc1992009-04-22 21:01:40 +00002087
2088
showard0bbfc212009-04-29 21:06:13 +00002089 def run(self):
showard597bfd32009-05-08 18:22:50 +00002090 autoserv_exit_code = self._autoserv_monitor.exit_code()
2091 # only run if Autoserv exited due to some signal. if we have no exit
2092 # code, assume something bad (and signal-like) happened.
2093 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002094 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002095 else:
2096 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002097
2098
showard8fe93b52008-11-18 17:53:22 +00002099class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002100 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2101
2102
2103 def __init__(self, host=None, queue_entry=None, task=None,
2104 recover_run_monitor=None):
showardfa8629c2008-11-04 16:51:23 +00002105 assert bool(host) ^ bool(queue_entry)
2106 if queue_entry:
2107 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00002108 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00002109 self.host = host
showard170873e2009-01-07 00:22:26 +00002110
showarde788ea62008-11-17 21:02:47 +00002111 repair_task = RepairTask(host, queue_entry=queue_entry)
showarded2afea2009-07-07 20:54:07 +00002112 super(CleanupTask, self).__init__(
2113 task, ['--cleanup'], failure_tasks=[repair_task],
2114 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002115
2116 self._set_ids(host=host, queue_entries=[queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002117
mblighd5c95802008-03-05 00:33:46 +00002118
jadmanski0afbb632008-06-06 21:10:57 +00002119 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002120 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002121 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00002122 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00002123
mblighd5c95802008-03-05 00:33:46 +00002124
showard21baa452008-10-21 00:08:39 +00002125 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002126 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002127
showard21baa452008-10-21 00:08:39 +00002128 if self.success:
showardfa8629c2008-11-04 16:51:23 +00002129 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00002130 self.host.update_field('dirty', 0)
2131
2132
showardd3dc1992009-04-22 21:01:40 +00002133class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002134 _num_running_parses = 0
2135
showarded2afea2009-07-07 20:54:07 +00002136 def __init__(self, queue_entries, recover_run_monitor=None):
2137 super(FinalReparseTask, self).__init__(
2138 queue_entries, pidfile_name=_PARSER_PID_FILE,
2139 logfile_name='.parse.log',
2140 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002141 # don't use _set_ids, since we don't want to set the host_ids
2142 self.queue_entry_ids = [entry.id for entry in queue_entries]
showarded2afea2009-07-07 20:54:07 +00002143 self._parse_started = self.started
showard97aed502008-11-04 02:01:24 +00002144
showard97aed502008-11-04 02:01:24 +00002145
2146 @classmethod
2147 def _increment_running_parses(cls):
2148 cls._num_running_parses += 1
2149
2150
2151 @classmethod
2152 def _decrement_running_parses(cls):
2153 cls._num_running_parses -= 1
2154
2155
2156 @classmethod
2157 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002158 return (cls._num_running_parses <
2159 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002160
2161
2162 def prolog(self):
2163 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002164 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00002165
2166
2167 def epilog(self):
2168 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002169 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002170
2171
showardd3dc1992009-04-22 21:01:40 +00002172 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002173 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002174 results_dir]
showard97aed502008-11-04 02:01:24 +00002175
2176
showard08a36412009-05-05 01:01:13 +00002177 def tick(self):
2178 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002179 # and we can, at which point we revert to default behavior
2180 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002181 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002182 else:
2183 self._try_starting_parse()
2184
2185
2186 def run(self):
2187 # override run() to not actually run unless we can
2188 self._try_starting_parse()
2189
2190
2191 def _try_starting_parse(self):
2192 if not self._can_run_new_parse():
2193 return
showard170873e2009-01-07 00:22:26 +00002194
showard97aed502008-11-04 02:01:24 +00002195 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002196 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002197
showard97aed502008-11-04 02:01:24 +00002198 self._increment_running_parses()
2199 self._parse_started = True
2200
2201
2202 def finished(self, success):
2203 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002204 if self._parse_started:
2205 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002206
2207
showardc9ae1782009-01-30 01:42:37 +00002208class SetEntryPendingTask(AgentTask):
2209 def __init__(self, queue_entry):
2210 super(SetEntryPendingTask, self).__init__(cmd='')
2211 self._queue_entry = queue_entry
2212 self._set_ids(queue_entries=[queue_entry])
2213
2214
2215 def run(self):
2216 agent = self._queue_entry.on_pending()
2217 if agent:
2218 self.agent.dispatcher.add_agent(agent)
2219 self.finished(True)
2220
2221
showarda3c58572009-03-12 20:36:59 +00002222class DBError(Exception):
2223 """Raised by the DBObject constructor when its select fails."""
2224
2225
mbligh36768f02008-02-22 18:28:33 +00002226class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002227 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002228
2229 # Subclasses MUST override these:
2230 _table_name = ''
2231 _fields = ()
2232
showarda3c58572009-03-12 20:36:59 +00002233 # A mapping from (type, id) to the instance of the object for that
2234 # particular id. This prevents us from creating new Job() and Host()
2235 # instances for every HostQueueEntry object that we instantiate as
2236 # multiple HQEs often share the same Job.
2237 _instances_by_type_and_id = weakref.WeakValueDictionary()
2238 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002239
showarda3c58572009-03-12 20:36:59 +00002240
2241 def __new__(cls, id=None, **kwargs):
2242 """
2243 Look to see if we already have an instance for this particular type
2244 and id. If so, use it instead of creating a duplicate instance.
2245 """
2246 if id is not None:
2247 instance = cls._instances_by_type_and_id.get((cls, id))
2248 if instance:
2249 return instance
2250 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2251
2252
2253 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00002254 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00002255 assert self._table_name, '_table_name must be defined in your class'
2256 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002257 if not new_record:
2258 if self._initialized and not always_query:
2259 return # We've already been initialized.
2260 if id is None:
2261 id = row[0]
2262 # Tell future constructors to use us instead of re-querying while
2263 # this instance is still around.
2264 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002265
showard6ae5ea92009-02-25 00:11:51 +00002266 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002267
jadmanski0afbb632008-06-06 21:10:57 +00002268 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002269
jadmanski0afbb632008-06-06 21:10:57 +00002270 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002271 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002272
showarda3c58572009-03-12 20:36:59 +00002273 if self._initialized:
2274 differences = self._compare_fields_in_row(row)
2275 if differences:
showard7629f142009-03-27 21:02:02 +00002276 logging.warn(
2277 'initialized %s %s instance requery is updating: %s',
2278 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002279 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002280 self._initialized = True
2281
2282
2283 @classmethod
2284 def _clear_instance_cache(cls):
2285 """Used for testing, clear the internal instance cache."""
2286 cls._instances_by_type_and_id.clear()
2287
2288
showardccbd6c52009-03-21 00:10:21 +00002289 def _fetch_row_from_db(self, row_id):
2290 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2291 rows = _db.execute(sql, (row_id,))
2292 if not rows:
showard76e29d12009-04-15 21:53:10 +00002293 raise DBError("row not found (table=%s, row id=%s)"
2294 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002295 return rows[0]
2296
2297
showarda3c58572009-03-12 20:36:59 +00002298 def _assert_row_length(self, row):
2299 assert len(row) == len(self._fields), (
2300 "table = %s, row = %s/%d, fields = %s/%d" % (
2301 self.__table, row, len(row), self._fields, len(self._fields)))
2302
2303
2304 def _compare_fields_in_row(self, row):
2305 """
2306 Given a row as returned by a SELECT query, compare it to our existing
2307 in memory fields.
2308
2309 @param row - A sequence of values corresponding to fields named in
2310 The class attribute _fields.
2311
2312 @returns A dictionary listing the differences keyed by field name
2313 containing tuples of (current_value, row_value).
2314 """
2315 self._assert_row_length(row)
2316 differences = {}
2317 for field, row_value in itertools.izip(self._fields, row):
2318 current_value = getattr(self, field)
2319 if current_value != row_value:
2320 differences[field] = (current_value, row_value)
2321 return differences
showard2bab8f42008-11-12 18:15:22 +00002322
2323
2324 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002325 """
2326 Update our field attributes using a single row returned by SELECT.
2327
2328 @param row - A sequence of values corresponding to fields named in
2329 the class fields list.
2330 """
2331 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002332
showard2bab8f42008-11-12 18:15:22 +00002333 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002334 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002335 setattr(self, field, value)
2336 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002337
showard2bab8f42008-11-12 18:15:22 +00002338 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002339
mblighe2586682008-02-29 22:45:46 +00002340
showardccbd6c52009-03-21 00:10:21 +00002341 def update_from_database(self):
2342 assert self.id is not None
2343 row = self._fetch_row_from_db(self.id)
2344 self._update_fields_from_row(row)
2345
2346
jadmanski0afbb632008-06-06 21:10:57 +00002347 def count(self, where, table = None):
2348 if not table:
2349 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002350
jadmanski0afbb632008-06-06 21:10:57 +00002351 rows = _db.execute("""
2352 SELECT count(*) FROM %s
2353 WHERE %s
2354 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002355
jadmanski0afbb632008-06-06 21:10:57 +00002356 assert len(rows) == 1
2357
2358 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002359
2360
showardd3dc1992009-04-22 21:01:40 +00002361 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002362 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002363
showard2bab8f42008-11-12 18:15:22 +00002364 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002365 return
mbligh36768f02008-02-22 18:28:33 +00002366
mblighf8c624d2008-07-03 16:58:45 +00002367 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002368 _db.execute(query, (value, self.id))
2369
showard2bab8f42008-11-12 18:15:22 +00002370 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002371
2372
jadmanski0afbb632008-06-06 21:10:57 +00002373 def save(self):
2374 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002375 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002376 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002377 values = []
2378 for key in keys:
2379 value = getattr(self, key)
2380 if value is None:
2381 values.append('NULL')
2382 else:
2383 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002384 values_str = ','.join(values)
2385 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2386 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002387 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002388 # Update our id to the one the database just assigned to us.
2389 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002390
2391
jadmanski0afbb632008-06-06 21:10:57 +00002392 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002393 self._instances_by_type_and_id.pop((type(self), id), None)
2394 self._initialized = False
2395 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002396 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2397 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002398
2399
showard63a34772008-08-18 19:32:50 +00002400 @staticmethod
2401 def _prefix_with(string, prefix):
2402 if string:
2403 string = prefix + string
2404 return string
2405
2406
jadmanski0afbb632008-06-06 21:10:57 +00002407 @classmethod
showard989f25d2008-10-01 11:38:11 +00002408 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002409 """
2410 Construct instances of our class based on the given database query.
2411
2412 @yields One class instance for each row fetched.
2413 """
showard63a34772008-08-18 19:32:50 +00002414 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2415 where = cls._prefix_with(where, 'WHERE ')
2416 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002417 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002418 'joins' : joins,
2419 'where' : where,
2420 'order_by' : order_by})
2421 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002422 for row in rows:
2423 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002424
mbligh36768f02008-02-22 18:28:33 +00002425
2426class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002427 _table_name = 'ineligible_host_queues'
2428 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002429
2430
showard89f84db2009-03-12 20:39:13 +00002431class AtomicGroup(DBObject):
2432 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002433 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2434 'invalid')
showard89f84db2009-03-12 20:39:13 +00002435
2436
showard989f25d2008-10-01 11:38:11 +00002437class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002438 _table_name = 'labels'
2439 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002440 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002441
2442
showard6157c632009-07-06 20:19:31 +00002443 def __repr__(self):
2444 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2445 self.name, self.id, self.atomic_group_id)
2446
2447
mbligh36768f02008-02-22 18:28:33 +00002448class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002449 _table_name = 'hosts'
2450 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2451 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2452
2453
jadmanski0afbb632008-06-06 21:10:57 +00002454 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002455 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002456 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002457
2458
showard170873e2009-01-07 00:22:26 +00002459 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002460 """
showard170873e2009-01-07 00:22:26 +00002461 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002462 """
2463 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002464 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002465 FROM labels
2466 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002467 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002468 ORDER BY labels.name
2469 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002470 platform = None
2471 all_labels = []
2472 for label_name, is_platform in rows:
2473 if is_platform:
2474 platform = label_name
2475 all_labels.append(label_name)
2476 return platform, all_labels
2477
2478
showard2fe3f1d2009-07-06 20:19:11 +00002479 def reverify_tasks(self):
2480 cleanup_task = CleanupTask(host=self)
2481 verify_task = VerifyTask(host=self)
2482
showard6d7b2ff2009-06-10 00:16:47 +00002483 # just to make sure this host does not get taken away
showard2fe3f1d2009-07-06 20:19:11 +00002484 self.set_status('Cleaning')
2485 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002486
2487
showard54c1ea92009-05-20 00:32:58 +00002488 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2489
2490
2491 @classmethod
2492 def cmp_for_sort(cls, a, b):
2493 """
2494 A comparison function for sorting Host objects by hostname.
2495
2496 This strips any trailing numeric digits, ignores leading 0s and
2497 compares hostnames by the leading name and the trailing digits as a
2498 number. If both hostnames do not match this pattern, they are simply
2499 compared as lower case strings.
2500
2501 Example of how hostnames will be sorted:
2502
2503 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2504
2505 This hopefully satisfy most people's hostname sorting needs regardless
2506 of their exact naming schemes. Nobody sane should have both a host10
2507 and host010 (but the algorithm works regardless).
2508 """
2509 lower_a = a.hostname.lower()
2510 lower_b = b.hostname.lower()
2511 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2512 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2513 if match_a and match_b:
2514 name_a, number_a_str = match_a.groups()
2515 name_b, number_b_str = match_b.groups()
2516 number_a = int(number_a_str.lstrip('0'))
2517 number_b = int(number_b_str.lstrip('0'))
2518 result = cmp((name_a, number_a), (name_b, number_b))
2519 if result == 0 and lower_a != lower_b:
2520 # If they compared equal above but the lower case names are
2521 # indeed different, don't report equality. abc012 != abc12.
2522 return cmp(lower_a, lower_b)
2523 return result
2524 else:
2525 return cmp(lower_a, lower_b)
2526
2527
mbligh36768f02008-02-22 18:28:33 +00002528class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002529 _table_name = 'host_queue_entries'
2530 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002531 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002532 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002533
2534
showarda3c58572009-03-12 20:36:59 +00002535 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002536 assert id or row
showarda3c58572009-03-12 20:36:59 +00002537 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002538 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002539
jadmanski0afbb632008-06-06 21:10:57 +00002540 if self.host_id:
2541 self.host = Host(self.host_id)
2542 else:
2543 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002544
showard77182562009-06-10 00:16:05 +00002545 if self.atomic_group_id:
2546 self.atomic_group = AtomicGroup(self.atomic_group_id,
2547 always_query=False)
2548 else:
2549 self.atomic_group = None
2550
showard170873e2009-01-07 00:22:26 +00002551 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002552 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002553
2554
showard89f84db2009-03-12 20:39:13 +00002555 @classmethod
2556 def clone(cls, template):
2557 """
2558 Creates a new row using the values from a template instance.
2559
2560 The new instance will not exist in the database or have a valid
2561 id attribute until its save() method is called.
2562 """
2563 assert isinstance(template, cls)
2564 new_row = [getattr(template, field) for field in cls._fields]
2565 clone = cls(row=new_row, new_record=True)
2566 clone.id = None
2567 return clone
2568
2569
showardc85c21b2008-11-24 22:17:37 +00002570 def _view_job_url(self):
2571 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2572
2573
showardf1ae3542009-05-11 19:26:02 +00002574 def get_labels(self):
2575 """
2576 Get all labels associated with this host queue entry (either via the
2577 meta_host or as a job dependency label). The labels yielded are not
2578 guaranteed to be unique.
2579
2580 @yields Label instances associated with this host_queue_entry.
2581 """
2582 if self.meta_host:
2583 yield Label(id=self.meta_host, always_query=False)
2584 labels = Label.fetch(
2585 joins="JOIN jobs_dependency_labels AS deps "
2586 "ON (labels.id = deps.label_id)",
2587 where="deps.job_id = %d" % self.job.id)
2588 for label in labels:
2589 yield label
2590
2591
jadmanski0afbb632008-06-06 21:10:57 +00002592 def set_host(self, host):
2593 if host:
2594 self.queue_log_record('Assigning host ' + host.hostname)
2595 self.update_field('host_id', host.id)
2596 self.update_field('active', True)
2597 self.block_host(host.id)
2598 else:
2599 self.queue_log_record('Releasing host')
2600 self.unblock_host(self.host.id)
2601 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002602
jadmanski0afbb632008-06-06 21:10:57 +00002603 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002604
2605
jadmanski0afbb632008-06-06 21:10:57 +00002606 def get_host(self):
2607 return self.host
mbligh36768f02008-02-22 18:28:33 +00002608
2609
jadmanski0afbb632008-06-06 21:10:57 +00002610 def queue_log_record(self, log_line):
2611 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002612 _drone_manager.write_lines_to_file(self.queue_log_path,
2613 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002614
2615
jadmanski0afbb632008-06-06 21:10:57 +00002616 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002617 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002618 row = [0, self.job.id, host_id]
2619 block = IneligibleHostQueue(row=row, new_record=True)
2620 block.save()
mblighe2586682008-02-29 22:45:46 +00002621
2622
jadmanski0afbb632008-06-06 21:10:57 +00002623 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002624 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002625 blocks = IneligibleHostQueue.fetch(
2626 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2627 for block in blocks:
2628 block.delete()
mblighe2586682008-02-29 22:45:46 +00002629
2630
showard2bab8f42008-11-12 18:15:22 +00002631 def set_execution_subdir(self, subdir=None):
2632 if subdir is None:
2633 assert self.get_host()
2634 subdir = self.get_host().hostname
2635 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002636
2637
showard6355f6b2008-12-05 18:52:13 +00002638 def _get_hostname(self):
2639 if self.host:
2640 return self.host.hostname
2641 return 'no host'
2642
2643
showard170873e2009-01-07 00:22:26 +00002644 def __str__(self):
2645 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2646
2647
jadmanski0afbb632008-06-06 21:10:57 +00002648 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002649 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002650
showardb18134f2009-03-20 20:52:18 +00002651 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002652
showardc85c21b2008-11-24 22:17:37 +00002653 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002654 self.update_field('complete', False)
2655 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002656
jadmanski0afbb632008-06-06 21:10:57 +00002657 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002658 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002659 self.update_field('complete', False)
2660 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002661
showardc85c21b2008-11-24 22:17:37 +00002662 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002663 self.update_field('complete', True)
2664 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002665
2666 should_email_status = (status.lower() in _notify_email_statuses or
2667 'all' in _notify_email_statuses)
2668 if should_email_status:
2669 self._email_on_status(status)
2670
2671 self._email_on_job_complete()
2672
2673
2674 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002675 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002676
2677 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2678 self.job.id, self.job.name, hostname, status)
2679 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2680 self.job.id, self.job.name, hostname, status,
2681 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002682 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002683
2684
2685 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002686 if not self.job.is_finished():
2687 return
showard542e8402008-09-19 20:16:18 +00002688
showardc85c21b2008-11-24 22:17:37 +00002689 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002690 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002691 for queue_entry in hosts_queue:
2692 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002693 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002694 queue_entry.status))
2695
2696 summary_text = "\n".join(summary_text)
2697 status_counts = models.Job.objects.get_status_counts(
2698 [self.job.id])[self.job.id]
2699 status = ', '.join('%d %s' % (count, status) for status, count
2700 in status_counts.iteritems())
2701
2702 subject = 'Autotest: Job ID: %s "%s" %s' % (
2703 self.job.id, self.job.name, status)
2704 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2705 self.job.id, self.job.name, status, self._view_job_url(),
2706 summary_text)
showard170873e2009-01-07 00:22:26 +00002707 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002708
2709
showard77182562009-06-10 00:16:05 +00002710 def run_pre_job_tasks(self, assigned_host=None):
2711 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002712 assert assigned_host
2713 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002714 if self.host_id is None:
2715 self.set_host(assigned_host)
2716 else:
2717 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002718
showardcfd4a7e2009-07-11 01:47:33 +00002719 logging.info("%s/%s/%s scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002720 self.job.name, self.meta_host, self.atomic_group_id,
2721 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002722
showard77182562009-06-10 00:16:05 +00002723 return self._do_run_pre_job_tasks()
2724
2725
2726 def _do_run_pre_job_tasks(self):
2727 # Every host goes thru the Verifying stage (which may or may not
2728 # actually do anything as determined by get_pre_job_tasks).
2729 self.set_status(models.HostQueueEntry.Status.VERIFYING)
2730
2731 # The pre job tasks always end with a SetEntryPendingTask which
2732 # will continue as appropriate through queue_entry.on_pending().
2733 return Agent(self.job.get_pre_job_tasks(queue_entry=self))
mblighe2586682008-02-29 22:45:46 +00002734
showard6ae5ea92009-02-25 00:11:51 +00002735
jadmanski0afbb632008-06-06 21:10:57 +00002736 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002737 assert self.host
jadmanski0afbb632008-06-06 21:10:57 +00002738 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002739 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002740 # verify/cleanup failure sets the execution subdir, so reset it here
2741 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002742 if self.meta_host:
2743 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002744
2745
jadmanski0afbb632008-06-06 21:10:57 +00002746 def handle_host_failure(self):
2747 """\
2748 Called when this queue entry's host has failed verification and
2749 repair.
2750 """
2751 assert not self.meta_host
2752 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002753 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002754
2755
jadmanskif7fa2cc2008-10-01 14:13:23 +00002756 @property
2757 def aborted_by(self):
2758 self._load_abort_info()
2759 return self._aborted_by
2760
2761
2762 @property
2763 def aborted_on(self):
2764 self._load_abort_info()
2765 return self._aborted_on
2766
2767
2768 def _load_abort_info(self):
2769 """ Fetch info about who aborted the job. """
2770 if hasattr(self, "_aborted_by"):
2771 return
2772 rows = _db.execute("""
2773 SELECT users.login, aborted_host_queue_entries.aborted_on
2774 FROM aborted_host_queue_entries
2775 INNER JOIN users
2776 ON users.id = aborted_host_queue_entries.aborted_by_id
2777 WHERE aborted_host_queue_entries.queue_entry_id = %s
2778 """, (self.id,))
2779 if rows:
2780 self._aborted_by, self._aborted_on = rows[0]
2781 else:
2782 self._aborted_by = self._aborted_on = None
2783
2784
showardb2e2c322008-10-14 17:33:55 +00002785 def on_pending(self):
2786 """
2787 Called when an entry in a synchronous job has passed verify. If the
2788 job is ready to run, returns an agent to run the job. Returns None
2789 otherwise.
2790 """
2791 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002792 self.get_host().set_status('Pending')
showardb000a8d2009-07-28 20:02:07 +00002793
2794 # Some debug code here: sends an email if an asynchronous job does not
2795 # immediately enter Starting.
2796 # TODO: Remove this once we figure out why asynchronous jobs are getting
2797 # stuck in Pending.
2798 agent = self.job.run_if_ready(queue_entry=self)
2799 if self.job.synch_count == 1 and agent is None:
2800 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
2801 message = 'Asynchronous job stuck in Pending'
2802 email_manager.manager.enqueue_notify_email(subject, message)
2803 return agent
showardb2e2c322008-10-14 17:33:55 +00002804
2805
showardd3dc1992009-04-22 21:01:40 +00002806 def abort(self, dispatcher):
2807 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002808
showardd3dc1992009-04-22 21:01:40 +00002809 Status = models.HostQueueEntry.Status
2810 has_running_job_agent = (
2811 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2812 and dispatcher.get_agents_for_entry(self))
2813 if has_running_job_agent:
2814 # do nothing; post-job tasks will finish and then mark this entry
2815 # with status "Aborted" and take care of the host
2816 return
2817
2818 if self.status in (Status.STARTING, Status.PENDING):
2819 self.host.set_status(models.Host.Status.READY)
2820 elif self.status == Status.VERIFYING:
2821 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2822
2823 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002824
2825 def execution_tag(self):
2826 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00002827 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002828
2829
showarded2afea2009-07-07 20:54:07 +00002830 def execution_path(self):
2831 return self.execution_tag()
2832
2833
mbligh36768f02008-02-22 18:28:33 +00002834class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002835 _table_name = 'jobs'
2836 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2837 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002838 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002839 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002840
showard77182562009-06-10 00:16:05 +00002841 # This does not need to be a column in the DB. The delays are likely to
2842 # be configured short. If the scheduler is stopped and restarted in
2843 # the middle of a job's delay cycle, the delay cycle will either be
2844 # repeated or skipped depending on the number of Pending machines found
2845 # when the restarted scheduler recovers to track it. Not a problem.
2846 #
2847 # A reference to the DelayedCallTask that will wake up the job should
2848 # no other HQEs change state in time. Its end_time attribute is used
2849 # by our run_with_ready_delay() method to determine if the wait is over.
2850 _delay_ready_task = None
2851
2852 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2853 # all status='Pending' atomic group HQEs incase a delay was running when the
2854 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002855
showarda3c58572009-03-12 20:36:59 +00002856 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002857 assert id or row
showarda3c58572009-03-12 20:36:59 +00002858 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002859
mblighe2586682008-02-29 22:45:46 +00002860
jadmanski0afbb632008-06-06 21:10:57 +00002861 def is_server_job(self):
2862 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002863
2864
showard170873e2009-01-07 00:22:26 +00002865 def tag(self):
2866 return "%s-%s" % (self.id, self.owner)
2867
2868
jadmanski0afbb632008-06-06 21:10:57 +00002869 def get_host_queue_entries(self):
2870 rows = _db.execute("""
2871 SELECT * FROM host_queue_entries
2872 WHERE job_id= %s
2873 """, (self.id,))
2874 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002875
jadmanski0afbb632008-06-06 21:10:57 +00002876 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002877
jadmanski0afbb632008-06-06 21:10:57 +00002878 return entries
mbligh36768f02008-02-22 18:28:33 +00002879
2880
jadmanski0afbb632008-06-06 21:10:57 +00002881 def set_status(self, status, update_queues=False):
2882 self.update_field('status',status)
2883
2884 if update_queues:
2885 for queue_entry in self.get_host_queue_entries():
2886 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002887
2888
showard77182562009-06-10 00:16:05 +00002889 def _atomic_and_has_started(self):
2890 """
2891 @returns True if any of the HostQueueEntries associated with this job
2892 have entered the Status.STARTING state or beyond.
2893 """
2894 atomic_entries = models.HostQueueEntry.objects.filter(
2895 job=self.id, atomic_group__isnull=False)
2896 if atomic_entries.count() <= 0:
2897 return False
2898
showardaf8b4ca2009-06-16 18:47:26 +00002899 # These states may *only* be reached if Job.run() has been called.
2900 started_statuses = (models.HostQueueEntry.Status.STARTING,
2901 models.HostQueueEntry.Status.RUNNING,
2902 models.HostQueueEntry.Status.COMPLETED)
2903
2904 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00002905 return started_entries.count() > 0
2906
2907
2908 def _pending_count(self):
2909 """The number of HostQueueEntries for this job in the Pending state."""
2910 pending_entries = models.HostQueueEntry.objects.filter(
2911 job=self.id, status=models.HostQueueEntry.Status.PENDING)
2912 return pending_entries.count()
2913
2914
jadmanski0afbb632008-06-06 21:10:57 +00002915 def is_ready(self):
showard77182562009-06-10 00:16:05 +00002916 # NOTE: Atomic group jobs stop reporting ready after they have been
2917 # started to avoid launching multiple copies of one atomic job.
2918 # Only possible if synch_count is less than than half the number of
2919 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00002920 pending_count = self._pending_count()
2921 atomic_and_has_started = self._atomic_and_has_started()
2922 ready = (pending_count >= self.synch_count
2923 and not self._atomic_and_has_started())
2924
2925 if not ready:
2926 logging.info(
2927 'Job %s not ready: %s pending, %s required '
2928 '(Atomic and started: %s)',
2929 self, pending_count, self.synch_count,
2930 atomic_and_has_started)
2931
2932 return ready
mbligh36768f02008-02-22 18:28:33 +00002933
2934
jadmanski0afbb632008-06-06 21:10:57 +00002935 def num_machines(self, clause = None):
2936 sql = "job_id=%s" % self.id
2937 if clause:
2938 sql += " AND (%s)" % clause
2939 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002940
2941
jadmanski0afbb632008-06-06 21:10:57 +00002942 def num_queued(self):
2943 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002944
2945
jadmanski0afbb632008-06-06 21:10:57 +00002946 def num_active(self):
2947 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002948
2949
jadmanski0afbb632008-06-06 21:10:57 +00002950 def num_complete(self):
2951 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002952
2953
jadmanski0afbb632008-06-06 21:10:57 +00002954 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002955 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002956
mbligh36768f02008-02-22 18:28:33 +00002957
showard6bb7c292009-01-30 01:44:51 +00002958 def _not_yet_run_entries(self, include_verifying=True):
2959 statuses = [models.HostQueueEntry.Status.QUEUED,
2960 models.HostQueueEntry.Status.PENDING]
2961 if include_verifying:
2962 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2963 return models.HostQueueEntry.objects.filter(job=self.id,
2964 status__in=statuses)
2965
2966
2967 def _stop_all_entries(self):
2968 entries_to_stop = self._not_yet_run_entries(
2969 include_verifying=False)
2970 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002971 assert not child_entry.complete, (
2972 '%s status=%s, active=%s, complete=%s' %
2973 (child_entry.id, child_entry.status, child_entry.active,
2974 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002975 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2976 child_entry.host.status = models.Host.Status.READY
2977 child_entry.host.save()
2978 child_entry.status = models.HostQueueEntry.Status.STOPPED
2979 child_entry.save()
2980
showard2bab8f42008-11-12 18:15:22 +00002981 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002982 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002983 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002984 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002985
2986
jadmanski0afbb632008-06-06 21:10:57 +00002987 def write_to_machines_file(self, queue_entry):
2988 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002989 file_path = os.path.join(self.tag(), '.machines')
2990 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002991
2992
showardf1ae3542009-05-11 19:26:02 +00002993 def _next_group_name(self, group_name=''):
2994 """@returns a directory name to use for the next host group results."""
2995 if group_name:
2996 # Sanitize for use as a pathname.
2997 group_name = group_name.replace(os.path.sep, '_')
2998 if group_name.startswith('.'):
2999 group_name = '_' + group_name[1:]
3000 # Add a separator between the group name and 'group%d'.
3001 group_name += '.'
3002 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00003003 query = models.HostQueueEntry.objects.filter(
3004 job=self.id).values('execution_subdir').distinct()
3005 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003006 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3007 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003008 if ids:
3009 next_id = max(ids) + 1
3010 else:
3011 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003012 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003013
3014
showard170873e2009-01-07 00:22:26 +00003015 def _write_control_file(self, execution_tag):
3016 control_path = _drone_manager.attach_file_to_execution(
3017 execution_tag, self.control_file)
3018 return control_path
mbligh36768f02008-02-22 18:28:33 +00003019
showardb2e2c322008-10-14 17:33:55 +00003020
showard2bab8f42008-11-12 18:15:22 +00003021 def get_group_entries(self, queue_entry_from_group):
3022 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003023 return list(HostQueueEntry.fetch(
3024 where='job_id=%s AND execution_subdir=%s',
3025 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003026
3027
showardb2e2c322008-10-14 17:33:55 +00003028 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003029 assert queue_entries
3030 execution_tag = queue_entries[0].execution_tag()
3031 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00003032 hostnames = ','.join([entry.get_host().hostname
3033 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00003034
showard87ba02a2009-04-20 19:37:32 +00003035 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003036 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003037 ['-P', execution_tag, '-n',
3038 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003039 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003040
jadmanski0afbb632008-06-06 21:10:57 +00003041 if not self.is_server_job():
3042 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003043
showardb2e2c322008-10-14 17:33:55 +00003044 return params
mblighe2586682008-02-29 22:45:46 +00003045
mbligh36768f02008-02-22 18:28:33 +00003046
showardc9ae1782009-01-30 01:42:37 +00003047 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003048 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003049 return True
showard0fc38302008-10-23 00:44:07 +00003050 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00003051 return queue_entry.get_host().dirty
3052 return False
showard21baa452008-10-21 00:08:39 +00003053
showardc9ae1782009-01-30 01:42:37 +00003054
showard2fe3f1d2009-07-06 20:19:11 +00003055 def should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003056 do_not_verify = (queue_entry.host.protection ==
3057 host_protections.Protection.DO_NOT_VERIFY)
3058 if do_not_verify:
3059 return False
3060 return self.run_verify
3061
3062
showard77182562009-06-10 00:16:05 +00003063 def get_pre_job_tasks(self, queue_entry):
3064 """
3065 Get a list of tasks to perform before the host_queue_entry
3066 may be used to run this Job (such as Cleanup & Verify).
3067
3068 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003069 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003070 task in the list calls HostQueueEntry.on_pending(), which
3071 continues the flow of the job.
3072 """
showard21baa452008-10-21 00:08:39 +00003073 tasks = []
showardc9ae1782009-01-30 01:42:37 +00003074 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00003075 tasks.append(CleanupTask(queue_entry=queue_entry))
showard2fe3f1d2009-07-06 20:19:11 +00003076 if self.should_run_verify(queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003077 tasks.append(VerifyTask(queue_entry=queue_entry))
3078 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00003079 return tasks
3080
3081
showardf1ae3542009-05-11 19:26:02 +00003082 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003083 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00003084 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00003085 else:
showardf1ae3542009-05-11 19:26:02 +00003086 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003087 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003088 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003089 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003090
3091 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003092 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003093
3094
3095 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003096 """
3097 @returns A tuple containing a list of HostQueueEntry instances to be
3098 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003099 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003100 """
showard77182562009-06-10 00:16:05 +00003101 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003102 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003103 if atomic_group:
3104 num_entries_wanted = atomic_group.max_number_of_machines
3105 else:
3106 num_entries_wanted = self.synch_count
3107 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003108
showardf1ae3542009-05-11 19:26:02 +00003109 if num_entries_wanted > 0:
3110 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003111 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003112 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003113 params=(self.id, include_queue_entry.id)))
3114
3115 # Sort the chosen hosts by hostname before slicing.
3116 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3117 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3118 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3119 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003120
showardf1ae3542009-05-11 19:26:02 +00003121 # Sanity check. We'll only ever be called if this can be met.
3122 assert len(chosen_entries) >= self.synch_count
3123
3124 if atomic_group:
3125 # Look at any meta_host and dependency labels and pick the first
3126 # one that also specifies this atomic group. Use that label name
3127 # as the group name if possible (it is more specific).
3128 group_name = atomic_group.name
3129 for label in include_queue_entry.get_labels():
3130 if label.atomic_group_id:
3131 assert label.atomic_group_id == atomic_group.id
3132 group_name = label.name
3133 break
3134 else:
3135 group_name = ''
3136
3137 self._assign_new_group(chosen_entries, group_name=group_name)
3138 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00003139
3140
showard77182562009-06-10 00:16:05 +00003141 def run_if_ready(self, queue_entry):
3142 """
3143 @returns An Agent instance to ultimately run this job if enough hosts
3144 are ready for it to run.
3145 @returns None and potentially cleans up excess hosts if this Job
3146 is not ready to run.
3147 """
showardb2e2c322008-10-14 17:33:55 +00003148 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003149 self.stop_if_necessary()
3150 return None
mbligh36768f02008-02-22 18:28:33 +00003151
showard77182562009-06-10 00:16:05 +00003152 if queue_entry.atomic_group:
3153 return self.run_with_ready_delay(queue_entry)
3154
3155 return self.run(queue_entry)
3156
3157
3158 def run_with_ready_delay(self, queue_entry):
3159 """
3160 Start a delay to wait for more hosts to enter Pending state before
3161 launching an atomic group job. Once set, the a delay cannot be reset.
3162
3163 @param queue_entry: The HostQueueEntry object to get atomic group
3164 info from and pass to run_if_ready when the delay is up.
3165
3166 @returns An Agent to run the job as appropriate or None if a delay
3167 has already been set.
3168 """
3169 assert queue_entry.job_id == self.id
3170 assert queue_entry.atomic_group
3171 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3172 pending_threshold = queue_entry.atomic_group.max_number_of_machines
3173 over_max_threshold = (self._pending_count() >= pending_threshold)
3174 delay_expired = (self._delay_ready_task and
3175 time.time() >= self._delay_ready_task.end_time)
3176
3177 # Delay is disabled or we already have enough? Do not wait to run.
3178 if not delay or over_max_threshold or delay_expired:
3179 return self.run(queue_entry)
3180
3181 # A delay was previously scheduled.
3182 if self._delay_ready_task:
3183 return None
3184
3185 def run_job_after_delay():
3186 logging.info('Job %s done waiting for extra hosts.', self.id)
3187 return self.run(queue_entry)
3188
3189 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3190 callback=run_job_after_delay)
3191
3192 return Agent([self._delay_ready_task], num_processes=0)
3193
3194
3195 def run(self, queue_entry):
3196 """
3197 @param queue_entry: The HostQueueEntry instance calling this method.
3198 @returns An Agent instance to run this job or None if we've already
3199 been run.
3200 """
3201 if queue_entry.atomic_group and self._atomic_and_has_started():
3202 logging.error('Job.run() called on running atomic Job %d '
3203 'with HQE %s.', self.id, queue_entry)
3204 return None
showardf1ae3542009-05-11 19:26:02 +00003205 queue_entries, group_name = self._choose_group_to_run(queue_entry)
3206 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00003207
3208
showardf1ae3542009-05-11 19:26:02 +00003209 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00003210 for queue_entry in queue_entries:
3211 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00003212 params = self._get_autoserv_params(queue_entries)
3213 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00003214 cmd=params, group_name=group_name)
3215 tasks = [queue_task]
showard77182562009-06-10 00:16:05 +00003216 if self._delay_ready_task:
3217 # Cancel any pending callback that would try to run again
3218 # as we are already running.
3219 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003220
showard170873e2009-01-07 00:22:26 +00003221 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00003222
3223
showardb000a8d2009-07-28 20:02:07 +00003224 def __str__(self):
3225 return '%s-%s' % (self.id, self.owner)
3226
3227
mbligh36768f02008-02-22 18:28:33 +00003228if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003229 main()