blob: 2e4c4b0cb22769cab6db8624d25d12a7a6bedad1 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
mblighb090f142008-02-27 21:33:46 +000023
mbligh36768f02008-02-22 18:28:33 +000024RESULTS_DIR = '.'
25AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000026DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000027AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
63
mbligh83c1e9e2009-05-01 23:10:41 +000064def _site_init_monitor_db_dummy():
65 return {}
66
67
mbligh36768f02008-02-22 18:28:33 +000068def main():
showard27f33872009-04-07 18:20:53 +000069 try:
70 main_without_exception_handling()
showard29caa4b2009-05-26 19:27:09 +000071 except SystemExit:
72 raise
showard27f33872009-04-07 18:20:53 +000073 except:
74 logging.exception('Exception escaping in monitor_db')
75 raise
76
77
78def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000079 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000080
showard136e6dc2009-06-10 19:38:49 +000081 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000082 parser = optparse.OptionParser(usage)
83 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
84 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000085 parser.add_option('--test', help='Indicate that scheduler is under ' +
86 'test and should use dummy autoserv and no parsing',
87 action='store_true')
88 (options, args) = parser.parse_args()
89 if len(args) != 1:
90 parser.print_usage()
91 return
mbligh36768f02008-02-22 18:28:33 +000092
showard5613c662009-06-08 23:30:33 +000093 scheduler_enabled = global_config.global_config.get_config_value(
94 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
95
96 if not scheduler_enabled:
97 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
98 "global_config's SCHEDULER section to enabled it. Exiting.")
99 print msg
100 sys.exit(1)
101
jadmanski0afbb632008-06-06 21:10:57 +0000102 global RESULTS_DIR
103 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000104
mbligh83c1e9e2009-05-01 23:10:41 +0000105 site_init = utils.import_site_function(__file__,
106 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
107 _site_init_monitor_db_dummy)
108 site_init()
109
showardcca334f2009-03-12 20:38:34 +0000110 # Change the cwd while running to avoid issues incase we were launched from
111 # somewhere odd (such as a random NFS home directory of the person running
112 # sudo to launch us as the appropriate user).
113 os.chdir(RESULTS_DIR)
114
jadmanski0afbb632008-06-06 21:10:57 +0000115 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000116 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
117 "notify_email_statuses",
118 default='')
showardc85c21b2008-11-24 22:17:37 +0000119 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000120 _notify_email_statuses = [status for status in
121 re.split(r'[\s,;:]', notify_statuses_list.lower())
122 if status]
showardc85c21b2008-11-24 22:17:37 +0000123
jadmanski0afbb632008-06-06 21:10:57 +0000124 if options.test:
125 global _autoserv_path
126 _autoserv_path = 'autoserv_dummy'
127 global _testing_mode
128 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000129
mbligh37eceaa2008-12-15 22:56:37 +0000130 # AUTOTEST_WEB.base_url is still a supported config option as some people
131 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000132 global _base_url
showard170873e2009-01-07 00:22:26 +0000133 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
134 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000135 if config_base_url:
136 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000137 else:
mbligh37eceaa2008-12-15 22:56:37 +0000138 # For the common case of everything running on a single server you
139 # can just set the hostname in a single place in the config file.
140 server_name = c.get_config_value('SERVER', 'hostname')
141 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000142 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000143 sys.exit(1)
144 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000145
showardc5afc462009-01-13 00:09:39 +0000146 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000147 server.start()
148
jadmanski0afbb632008-06-06 21:10:57 +0000149 try:
showard136e6dc2009-06-10 19:38:49 +0000150 init()
showardc5afc462009-01-13 00:09:39 +0000151 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000152 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000153
jadmanski0afbb632008-06-06 21:10:57 +0000154 while not _shutdown:
155 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000156 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000157 except:
showard170873e2009-01-07 00:22:26 +0000158 email_manager.manager.log_stacktrace(
159 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000160
showard170873e2009-01-07 00:22:26 +0000161 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000162 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000163 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000164 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000165
166
showard136e6dc2009-06-10 19:38:49 +0000167def setup_logging():
168 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
169 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
170 logging_manager.configure_logging(
171 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
172 logfile_name=log_name)
173
174
mbligh36768f02008-02-22 18:28:33 +0000175def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000176 global _shutdown
177 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000178 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000179
180
showard136e6dc2009-06-10 19:38:49 +0000181def init():
showardb18134f2009-03-20 20:52:18 +0000182 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
183 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000184
mblighfb676032009-04-01 18:25:38 +0000185 utils.write_pid("monitor_db")
186
showardb1e51872008-10-07 11:08:18 +0000187 if _testing_mode:
188 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000189 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000190
jadmanski0afbb632008-06-06 21:10:57 +0000191 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
192 global _db
showard170873e2009-01-07 00:22:26 +0000193 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000194 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000195
showardfa8629c2008-11-04 16:51:23 +0000196 # ensure Django connection is in autocommit
197 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000198 # bypass the readonly connection
199 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000200
showardb18134f2009-03-20 20:52:18 +0000201 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000202 signal.signal(signal.SIGINT, handle_sigint)
203
showardd1ee1dd2009-01-07 21:33:08 +0000204 drones = global_config.global_config.get_config_value(
205 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
206 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000207 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000208 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000209 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
210
showardb18134f2009-03-20 20:52:18 +0000211 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000212
213
showard87ba02a2009-04-20 19:37:32 +0000214def _autoserv_command_line(machines, results_dir, extra_args, job=None,
showarde9c69362009-06-30 01:58:03 +0000215 queue_entry=None, verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000216 """
217 @returns The autoserv command line as a list of executable + parameters.
218
219 @param machines - string - A machine or comma separated list of machines
220 for the (-m) flag.
221 @param results_dir - string - Where the results will be written (-r).
222 @param extra_args - list - Additional arguments to pass to autoserv.
223 @param job - Job object - If supplied, -u owner and -l name parameters
224 will be added.
225 @param queue_entry - A HostQueueEntry object - If supplied and no Job
226 object was supplied, this will be used to lookup the Job object.
227 """
showard87ba02a2009-04-20 19:37:32 +0000228 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
229 '-r', _drone_manager.absolute_path(results_dir)]
230 if job or queue_entry:
231 if not job:
232 job = queue_entry.job
233 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000234 if verbose:
235 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000236 return autoserv_argv + extra_args
237
238
showard89f84db2009-03-12 20:39:13 +0000239class SchedulerError(Exception):
240 """Raised by HostScheduler when an inconsistent state occurs."""
241
242
showard63a34772008-08-18 19:32:50 +0000243class HostScheduler(object):
244 def _get_ready_hosts(self):
245 # avoid any host with a currently active queue entry against it
246 hosts = Host.fetch(
247 joins='LEFT JOIN host_queue_entries AS active_hqe '
248 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000249 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000250 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000251 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000252 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
253 return dict((host.id, host) for host in hosts)
254
255
256 @staticmethod
257 def _get_sql_id_list(id_list):
258 return ','.join(str(item_id) for item_id in id_list)
259
260
261 @classmethod
showard989f25d2008-10-01 11:38:11 +0000262 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000263 if not id_list:
264 return {}
showard63a34772008-08-18 19:32:50 +0000265 query %= cls._get_sql_id_list(id_list)
266 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000267 return cls._process_many2many_dict(rows, flip)
268
269
270 @staticmethod
271 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000272 result = {}
273 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000274 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000275 if flip:
276 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000277 result.setdefault(left_id, set()).add(right_id)
278 return result
279
280
281 @classmethod
282 def _get_job_acl_groups(cls, job_ids):
283 query = """
showardd9ac4452009-02-07 02:04:37 +0000284 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000285 FROM jobs
286 INNER JOIN users ON users.login = jobs.owner
287 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
288 WHERE jobs.id IN (%s)
289 """
290 return cls._get_many2many_dict(query, job_ids)
291
292
293 @classmethod
294 def _get_job_ineligible_hosts(cls, job_ids):
295 query = """
296 SELECT job_id, host_id
297 FROM ineligible_host_queues
298 WHERE job_id IN (%s)
299 """
300 return cls._get_many2many_dict(query, job_ids)
301
302
303 @classmethod
showard989f25d2008-10-01 11:38:11 +0000304 def _get_job_dependencies(cls, job_ids):
305 query = """
306 SELECT job_id, label_id
307 FROM jobs_dependency_labels
308 WHERE job_id IN (%s)
309 """
310 return cls._get_many2many_dict(query, job_ids)
311
312
313 @classmethod
showard63a34772008-08-18 19:32:50 +0000314 def _get_host_acls(cls, host_ids):
315 query = """
showardd9ac4452009-02-07 02:04:37 +0000316 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000317 FROM acl_groups_hosts
318 WHERE host_id IN (%s)
319 """
320 return cls._get_many2many_dict(query, host_ids)
321
322
323 @classmethod
324 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000325 if not host_ids:
326 return {}, {}
showard63a34772008-08-18 19:32:50 +0000327 query = """
328 SELECT label_id, host_id
329 FROM hosts_labels
330 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000331 """ % cls._get_sql_id_list(host_ids)
332 rows = _db.execute(query)
333 labels_to_hosts = cls._process_many2many_dict(rows)
334 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
335 return labels_to_hosts, hosts_to_labels
336
337
338 @classmethod
339 def _get_labels(cls):
340 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000341
342
343 def refresh(self, pending_queue_entries):
344 self._hosts_available = self._get_ready_hosts()
345
346 relevant_jobs = [queue_entry.job_id
347 for queue_entry in pending_queue_entries]
348 self._job_acls = self._get_job_acl_groups(relevant_jobs)
349 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000350 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000351
352 host_ids = self._hosts_available.keys()
353 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000354 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
355
356 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000357
358
359 def _is_acl_accessible(self, host_id, queue_entry):
360 job_acls = self._job_acls.get(queue_entry.job_id, set())
361 host_acls = self._host_acls.get(host_id, set())
362 return len(host_acls.intersection(job_acls)) > 0
363
364
showard989f25d2008-10-01 11:38:11 +0000365 def _check_job_dependencies(self, job_dependencies, host_labels):
366 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000367 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000368
369
370 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
371 queue_entry):
showardade14e22009-01-26 22:38:32 +0000372 if not queue_entry.meta_host:
373 # bypass only_if_needed labels when a specific host is selected
374 return True
375
showard989f25d2008-10-01 11:38:11 +0000376 for label_id in host_labels:
377 label = self._labels[label_id]
378 if not label.only_if_needed:
379 # we don't care about non-only_if_needed labels
380 continue
381 if queue_entry.meta_host == label_id:
382 # if the label was requested in a metahost it's OK
383 continue
384 if label_id not in job_dependencies:
385 return False
386 return True
387
388
showard89f84db2009-03-12 20:39:13 +0000389 def _check_atomic_group_labels(self, host_labels, queue_entry):
390 """
391 Determine if the given HostQueueEntry's atomic group settings are okay
392 to schedule on a host with the given labels.
393
showard6157c632009-07-06 20:19:31 +0000394 @param host_labels: A list of label ids that the host has.
395 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000396
397 @returns True if atomic group settings are okay, False otherwise.
398 """
showard6157c632009-07-06 20:19:31 +0000399 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000400 queue_entry.atomic_group_id)
401
402
showard6157c632009-07-06 20:19:31 +0000403 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000404 """
405 Return the atomic group label id for a host with the given set of
406 labels if any, or None otherwise. Raises an exception if more than
407 one atomic group are found in the set of labels.
408
showard6157c632009-07-06 20:19:31 +0000409 @param host_labels: A list of label ids that the host has.
410 @param queue_entry: The HostQueueEntry we're testing. Only used for
411 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000412
413 @returns The id of the atomic group found on a label in host_labels
414 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000415 """
showard6157c632009-07-06 20:19:31 +0000416 atomic_labels = [self._labels[label_id] for label_id in host_labels
417 if self._labels[label_id].atomic_group_id is not None]
418 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000419 if not atomic_ids:
420 return None
421 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000422 logging.error('More than one Atomic Group on HQE "%s" via: %r',
423 queue_entry, atomic_labels)
424 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000425
426
427 def _get_atomic_group_labels(self, atomic_group_id):
428 """
429 Lookup the label ids that an atomic_group is associated with.
430
431 @param atomic_group_id - The id of the AtomicGroup to look up.
432
433 @returns A generator yeilding Label ids for this atomic group.
434 """
435 return (id for id, label in self._labels.iteritems()
436 if label.atomic_group_id == atomic_group_id
437 and not label.invalid)
438
439
showard54c1ea92009-05-20 00:32:58 +0000440 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000441 """
442 @param group_hosts - A sequence of Host ids to test for usability
443 and eligibility against the Job associated with queue_entry.
444 @param queue_entry - The HostQueueEntry that these hosts are being
445 tested for eligibility against.
446
447 @returns A subset of group_hosts Host ids that are eligible for the
448 supplied queue_entry.
449 """
450 return set(host_id for host_id in group_hosts
451 if self._is_host_usable(host_id)
452 and self._is_host_eligible_for_job(host_id, queue_entry))
453
454
showard989f25d2008-10-01 11:38:11 +0000455 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000456 if self._is_host_invalid(host_id):
457 # if an invalid host is scheduled for a job, it's a one-time host
458 # and it therefore bypasses eligibility checks. note this can only
459 # happen for non-metahosts, because invalid hosts have their label
460 # relationships cleared.
461 return True
462
showard989f25d2008-10-01 11:38:11 +0000463 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
464 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000465
showard89f84db2009-03-12 20:39:13 +0000466 return (self._is_acl_accessible(host_id, queue_entry) and
467 self._check_job_dependencies(job_dependencies, host_labels) and
468 self._check_only_if_needed_labels(
469 job_dependencies, host_labels, queue_entry) and
470 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000471
472
showard2924b0a2009-06-18 23:16:15 +0000473 def _is_host_invalid(self, host_id):
474 host_object = self._hosts_available.get(host_id, None)
475 return host_object and host_object.invalid
476
477
showard63a34772008-08-18 19:32:50 +0000478 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000479 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000480 return None
481 return self._hosts_available.pop(queue_entry.host_id, None)
482
483
484 def _is_host_usable(self, host_id):
485 if host_id not in self._hosts_available:
486 # host was already used during this scheduling cycle
487 return False
488 if self._hosts_available[host_id].invalid:
489 # Invalid hosts cannot be used for metahosts. They're included in
490 # the original query because they can be used by non-metahosts.
491 return False
492 return True
493
494
495 def _schedule_metahost(self, queue_entry):
496 label_id = queue_entry.meta_host
497 hosts_in_label = self._label_hosts.get(label_id, set())
498 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
499 set())
500
501 # must iterate over a copy so we can mutate the original while iterating
502 for host_id in list(hosts_in_label):
503 if not self._is_host_usable(host_id):
504 hosts_in_label.remove(host_id)
505 continue
506 if host_id in ineligible_host_ids:
507 continue
showard989f25d2008-10-01 11:38:11 +0000508 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000509 continue
510
showard89f84db2009-03-12 20:39:13 +0000511 # Remove the host from our cached internal state before returning
512 # the host object.
showard63a34772008-08-18 19:32:50 +0000513 hosts_in_label.remove(host_id)
514 return self._hosts_available.pop(host_id)
515 return None
516
517
518 def find_eligible_host(self, queue_entry):
519 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000520 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000521 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000522 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000523 return self._schedule_metahost(queue_entry)
524
525
showard89f84db2009-03-12 20:39:13 +0000526 def find_eligible_atomic_group(self, queue_entry):
527 """
528 Given an atomic group host queue entry, locate an appropriate group
529 of hosts for the associated job to run on.
530
531 The caller is responsible for creating new HQEs for the additional
532 hosts returned in order to run the actual job on them.
533
534 @returns A list of Host instances in a ready state to satisfy this
535 atomic group scheduling. Hosts will all belong to the same
536 atomic group label as specified by the queue_entry.
537 An empty list will be returned if no suitable atomic
538 group could be found.
539
540 TODO(gps): what is responsible for kicking off any attempted repairs on
541 a group of hosts? not this function, but something needs to. We do
542 not communicate that reason for returning [] outside of here...
543 For now, we'll just be unschedulable if enough hosts within one group
544 enter Repair Failed state.
545 """
546 assert queue_entry.atomic_group_id is not None
547 job = queue_entry.job
548 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000549 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000550 if job.synch_count > atomic_group.max_number_of_machines:
551 # Such a Job and HostQueueEntry should never be possible to
552 # create using the frontend. Regardless, we can't process it.
553 # Abort it immediately and log an error on the scheduler.
554 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000555 logging.error(
556 'Error: job %d synch_count=%d > requested atomic_group %d '
557 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
558 job.id, job.synch_count, atomic_group.id,
559 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000560 return []
561 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
562 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
563 set())
564
565 # Look in each label associated with atomic_group until we find one with
566 # enough hosts to satisfy the job.
567 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
568 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
569 if queue_entry.meta_host is not None:
570 # If we have a metahost label, only allow its hosts.
571 group_hosts.intersection_update(hosts_in_label)
572 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000573 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000574 group_hosts, queue_entry)
575
576 # Job.synch_count is treated as "minimum synch count" when
577 # scheduling for an atomic group of hosts. The atomic group
578 # number of machines is the maximum to pick out of a single
579 # atomic group label for scheduling at one time.
580 min_hosts = job.synch_count
581 max_hosts = atomic_group.max_number_of_machines
582
showard54c1ea92009-05-20 00:32:58 +0000583 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000584 # Not enough eligible hosts in this atomic group label.
585 continue
586
showard54c1ea92009-05-20 00:32:58 +0000587 eligible_hosts_in_group = [self._hosts_available[id]
588 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000589 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000590 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000591
showard89f84db2009-03-12 20:39:13 +0000592 # Limit ourselves to scheduling the atomic group size.
593 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000594 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000595
596 # Remove the selected hosts from our cached internal state
597 # of available hosts in order to return the Host objects.
598 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000599 for host in eligible_hosts_in_group:
600 hosts_in_label.discard(host.id)
601 self._hosts_available.pop(host.id)
602 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000603 return host_list
604
605 return []
606
607
showard170873e2009-01-07 00:22:26 +0000608class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000609 def __init__(self):
610 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000611 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000612 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000613 user_cleanup_time = scheduler_config.config.clean_interval
614 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
615 _db, user_cleanup_time)
616 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000617 self._host_agents = {}
618 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000619
mbligh36768f02008-02-22 18:28:33 +0000620
showard915958d2009-04-22 21:00:58 +0000621 def initialize(self, recover_hosts=True):
622 self._periodic_cleanup.initialize()
623 self._24hr_upkeep.initialize()
624
jadmanski0afbb632008-06-06 21:10:57 +0000625 # always recover processes
626 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000627
jadmanski0afbb632008-06-06 21:10:57 +0000628 if recover_hosts:
629 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000630
631
jadmanski0afbb632008-06-06 21:10:57 +0000632 def tick(self):
showard170873e2009-01-07 00:22:26 +0000633 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000634 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000635 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000636 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000637 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000638 self._schedule_new_jobs()
639 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000640 _drone_manager.execute_actions()
641 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000642
showard97aed502008-11-04 02:01:24 +0000643
mblighf3294cc2009-04-08 21:17:38 +0000644 def _run_cleanup(self):
645 self._periodic_cleanup.run_cleanup_maybe()
646 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000647
mbligh36768f02008-02-22 18:28:33 +0000648
showard170873e2009-01-07 00:22:26 +0000649 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
650 for object_id in object_ids:
651 agent_dict.setdefault(object_id, set()).add(agent)
652
653
654 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
655 for object_id in object_ids:
656 assert object_id in agent_dict
657 agent_dict[object_id].remove(agent)
658
659
jadmanski0afbb632008-06-06 21:10:57 +0000660 def add_agent(self, agent):
661 self._agents.append(agent)
662 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000663 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
664 self._register_agent_for_ids(self._queue_entry_agents,
665 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000666
showard170873e2009-01-07 00:22:26 +0000667
668 def get_agents_for_entry(self, queue_entry):
669 """
670 Find agents corresponding to the specified queue_entry.
671 """
showardd3dc1992009-04-22 21:01:40 +0000672 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000673
674
675 def host_has_agent(self, host):
676 """
677 Determine if there is currently an Agent present using this host.
678 """
679 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000680
681
jadmanski0afbb632008-06-06 21:10:57 +0000682 def remove_agent(self, agent):
683 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000684 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
685 agent)
686 self._unregister_agent_for_ids(self._queue_entry_agents,
687 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000688
689
jadmanski0afbb632008-06-06 21:10:57 +0000690 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000691 self._register_pidfiles()
692 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000693 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000694 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000695 self._reverify_remaining_hosts()
696 # reinitialize drones after killing orphaned processes, since they can
697 # leave around files when they die
698 _drone_manager.execute_actions()
699 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000700
showard170873e2009-01-07 00:22:26 +0000701
702 def _register_pidfiles(self):
703 # during recovery we may need to read pidfiles for both running and
704 # parsing entries
705 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000706 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000707 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000708 for pidfile_name in _ALL_PIDFILE_NAMES:
709 pidfile_id = _drone_manager.get_pidfile_id_from(
710 queue_entry.execution_tag(), pidfile_name=pidfile_name)
711 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000712
713
showardd3dc1992009-04-22 21:01:40 +0000714 def _recover_entries_with_status(self, status, orphans, pidfile_name,
715 recover_entries_fn):
716 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000717 for queue_entry in queue_entries:
718 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000719 # synchronous job we've already recovered
720 continue
showardd3dc1992009-04-22 21:01:40 +0000721 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000722 execution_tag = queue_entry.execution_tag()
723 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000724 run_monitor.attach_to_existing_process(execution_tag,
725 pidfile_name=pidfile_name)
showard597bfd32009-05-08 18:22:50 +0000726
727 log_message = ('Recovering %s entry %s ' %
728 (status.lower(),
729 ', '.join(str(entry) for entry in queue_entries)))
showard170873e2009-01-07 00:22:26 +0000730 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000731 # execution apparently never happened
showard597bfd32009-05-08 18:22:50 +0000732 logging.info(log_message + 'without process')
showardd3dc1992009-04-22 21:01:40 +0000733 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000734 continue
mbligh90a549d2008-03-25 23:52:34 +0000735
showard597bfd32009-05-08 18:22:50 +0000736 logging.info(log_message + '(process %s)',
showardd3dc1992009-04-22 21:01:40 +0000737 run_monitor.get_process())
738 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
739 orphans.discard(run_monitor.get_process())
740
741
742 def _kill_remaining_orphan_processes(self, orphans):
743 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000744 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000745 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000746
showard170873e2009-01-07 00:22:26 +0000747
showardd3dc1992009-04-22 21:01:40 +0000748 def _recover_running_entries(self, orphans):
749 def recover_entries(job, queue_entries, run_monitor):
750 if run_monitor is not None:
751 queue_task = RecoveryQueueTask(job=job,
752 queue_entries=queue_entries,
753 run_monitor=run_monitor)
754 self.add_agent(Agent(tasks=[queue_task],
755 num_processes=len(queue_entries)))
756 # else, _requeue_other_active_entries will cover this
757
758 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
759 orphans, '.autoserv_execute',
760 recover_entries)
761
762
763 def _recover_gathering_entries(self, orphans):
764 def recover_entries(job, queue_entries, run_monitor):
765 gather_task = GatherLogsTask(job, queue_entries,
766 run_monitor=run_monitor)
767 self.add_agent(Agent([gather_task]))
768
769 self._recover_entries_with_status(
770 models.HostQueueEntry.Status.GATHERING,
771 orphans, _CRASHINFO_PID_FILE, recover_entries)
772
773
774 def _recover_parsing_entries(self, orphans):
775 def recover_entries(job, queue_entries, run_monitor):
776 reparse_task = FinalReparseTask(queue_entries,
777 run_monitor=run_monitor)
778 self.add_agent(Agent([reparse_task], num_processes=0))
779
780 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
781 orphans, _PARSER_PID_FILE,
782 recover_entries)
783
784
785 def _recover_all_recoverable_entries(self):
786 orphans = _drone_manager.get_orphaned_autoserv_processes()
787 self._recover_running_entries(orphans)
788 self._recover_gathering_entries(orphans)
789 self._recover_parsing_entries(orphans)
showard2fe3f1d2009-07-06 20:19:11 +0000790 self._recover_special_tasks()
showardd3dc1992009-04-22 21:01:40 +0000791 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000792
showard97aed502008-11-04 02:01:24 +0000793
showard2fe3f1d2009-07-06 20:19:11 +0000794 def _recover_special_tasks(self):
795 """\
796 Recovers all special tasks that have started running but have not
797 completed.
798 """
799
800 tasks = models.SpecialTask.objects.filter(is_active=True,
801 is_complete=False)
802 # Use ordering to force NULL queue_entry_id's to the end of the list
803 for task in tasks.order_by('-queue_entry_id'):
804 if self.host_has_agent(task.host):
805 # Duplicated verify task that we've already recovered
806 continue
807
808 logging.info("Recovering %s", task)
809
810 host = Host(id=task.host.id)
811 queue_entry = None
812 if task.queue_entry:
813 queue_entry = HostQueueEntry.fetch(
814 where='id = %s', params=(task.queue_entry.id,)).next()
815
816 self._recover_special_task(task, host, queue_entry)
817
818
819 def _recover_special_task(self, task, host, queue_entry):
820 """\
821 Recovers a single special task.
822 """
823 if task.task == models.SpecialTask.Task.VERIFY:
824 agent_tasks = self._recover_verify(task, host, queue_entry)
825 elif task.task == models.SpecialTask.Task.REPAIR:
826 agent_tasks = self._recover_repair(task, host, queue_entry)
827 elif task.task == models.SpecialTask.Task.CLEANUP:
828 agent_tasks = self._recover_cleanup(task, host, queue_entry)
829 else:
830 # Should never happen
831 logging.error(
832 "Special task id %d had invalid task %s", (task.id, task.task))
833
834 self.add_agent(Agent(agent_tasks))
835
836
837 def _recover_verify(self, task, host, queue_entry):
838 """\
839 Recovers a verify task.
840 No associated queue entry: Verify host
841 With associated queue entry: Verify host, and run associated queue
842 entry
843 """
844 if not task.queue_entry:
845 return [VerifyTask(host=host, task=task)]
846 else:
847 return [VerifyTask(queue_entry=queue_entry, task=task),
848 SetEntryPendingTask(queue_entry=queue_entry)]
849
850
851 def _recover_repair(self, task, host, queue_entry):
852 """\
853 Recovers a repair task.
854 Always repair host
855 """
856 return [RepairTask(host=host, queue_entry=queue_entry, task=task)]
857
858
859 def _recover_cleanup(self, task, host, queue_entry):
860 """\
861 Recovers a cleanup task.
862 No associated queue entry: Clean host
863 With associated queue entry: Clean host, verify host if needed, and
864 run associated queue entry
865 """
866 if not task.queue_entry:
867 return [CleanupTask(host=host, task=task)]
868 else:
869 agent_tasks = [CleanupTask(queue_entry=queue_entry,
870 task=task)]
871 if queue_entry.job.should_run_verify(queue_entry):
872 agent_tasks.append(VerifyTask(queue_entry=queue_entry))
873 agent_tasks.append(
874 SetEntryPendingTask(queue_entry=queue_entry))
875 return agent_tasks
876
877
showard170873e2009-01-07 00:22:26 +0000878 def _requeue_other_active_entries(self):
879 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000880 where='active AND NOT complete AND '
881 '(aborted OR status != "Pending")')
showardd3dc1992009-04-22 21:01:40 +0000882
showard2fe3f1d2009-07-06 20:19:11 +0000883 message = '\n'.join(str(entry) for entry in queue_entries
884 if not self.get_agents_for_entry(entry))
885 if message:
886 email_manager.manager.enqueue_notify_email(
887 'Unrecovered active host queue entries exist',
888 message)
showard170873e2009-01-07 00:22:26 +0000889
890
showard1ff7b2e2009-05-15 23:17:18 +0000891 def _find_reverify(self):
showard6d7b2ff2009-06-10 00:16:47 +0000892 tasks = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +0000893 task=models.SpecialTask.Task.VERIFY, is_active=False,
894 is_complete=False, queue_entry__isnull=True)
showard6d7b2ff2009-06-10 00:16:47 +0000895
showard2fe3f1d2009-07-06 20:19:11 +0000896 for task in tasks:
897 host = Host.fetch(where='id = %s', params=(task.host.id,)).next()
898 if host.locked or host.invalid or self.host_has_agent(host):
899 continue
showard6d7b2ff2009-06-10 00:16:47 +0000900
showard2fe3f1d2009-07-06 20:19:11 +0000901 logging.info('Force reverifying host %s', host.hostname)
902 self.add_agent(Agent([VerifyTask(host=host, task=task)]))
showard1ff7b2e2009-05-15 23:17:18 +0000903
904
showard170873e2009-01-07 00:22:26 +0000905 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000906 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000907 self._reverify_hosts_where("""(status = 'Repairing' OR
908 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000909 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000910
showard170873e2009-01-07 00:22:26 +0000911 # recover "Running" hosts with no active queue entries, although this
912 # should never happen
913 message = ('Recovering running host %s - this probably indicates a '
914 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000915 self._reverify_hosts_where("""status = 'Running' AND
916 id NOT IN (SELECT host_id
917 FROM host_queue_entries
918 WHERE active)""",
919 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000920
921
jadmanski0afbb632008-06-06 21:10:57 +0000922 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000923 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000924 full_where='locked = 0 AND invalid = 0 AND ' + where
925 for host in Host.fetch(where=full_where):
926 if self.host_has_agent(host):
927 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000928 continue
showard170873e2009-01-07 00:22:26 +0000929 if print_message:
showardb18134f2009-03-20 20:52:18 +0000930 logging.info(print_message, host.hostname)
showard2fe3f1d2009-07-06 20:19:11 +0000931 tasks = host.reverify_tasks()
showard170873e2009-01-07 00:22:26 +0000932 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000933
934
jadmanski0afbb632008-06-06 21:10:57 +0000935 def _recover_hosts(self):
936 # recover "Repair Failed" hosts
937 message = 'Reverifying dead host %s'
938 self._reverify_hosts_where("status = 'Repair Failed'",
939 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000940
941
showard04c82c52008-05-29 19:38:12 +0000942
showardb95b1bd2008-08-15 18:11:04 +0000943 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000944 # prioritize by job priority, then non-metahost over metahost, then FIFO
945 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000946 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000947 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000948 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000949
950
showard89f84db2009-03-12 20:39:13 +0000951 def _refresh_pending_queue_entries(self):
952 """
953 Lookup the pending HostQueueEntries and call our HostScheduler
954 refresh() method given that list. Return the list.
955
956 @returns A list of pending HostQueueEntries sorted in priority order.
957 """
showard63a34772008-08-18 19:32:50 +0000958 queue_entries = self._get_pending_queue_entries()
959 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000960 return []
showardb95b1bd2008-08-15 18:11:04 +0000961
showard63a34772008-08-18 19:32:50 +0000962 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000963
showard89f84db2009-03-12 20:39:13 +0000964 return queue_entries
965
966
967 def _schedule_atomic_group(self, queue_entry):
968 """
969 Schedule the given queue_entry on an atomic group of hosts.
970
971 Returns immediately if there are insufficient available hosts.
972
973 Creates new HostQueueEntries based off of queue_entry for the
974 scheduled hosts and starts them all running.
975 """
976 # This is a virtual host queue entry representing an entire
977 # atomic group, find a group and schedule their hosts.
978 group_hosts = self._host_scheduler.find_eligible_atomic_group(
979 queue_entry)
980 if not group_hosts:
981 return
showardcbe6f942009-06-17 19:33:49 +0000982
983 logging.info('Expanding atomic group entry %s with hosts %s',
984 queue_entry,
985 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +0000986 # The first assigned host uses the original HostQueueEntry
987 group_queue_entries = [queue_entry]
988 for assigned_host in group_hosts[1:]:
989 # Create a new HQE for every additional assigned_host.
990 new_hqe = HostQueueEntry.clone(queue_entry)
991 new_hqe.save()
992 group_queue_entries.append(new_hqe)
993 assert len(group_queue_entries) == len(group_hosts)
994 for queue_entry, host in itertools.izip(group_queue_entries,
995 group_hosts):
996 self._run_queue_entry(queue_entry, host)
997
998
999 def _schedule_new_jobs(self):
1000 queue_entries = self._refresh_pending_queue_entries()
1001 if not queue_entries:
1002 return
1003
showard63a34772008-08-18 19:32:50 +00001004 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +00001005 if (queue_entry.atomic_group_id is None or
1006 queue_entry.host_id is not None):
1007 assigned_host = self._host_scheduler.find_eligible_host(
1008 queue_entry)
1009 if assigned_host:
1010 self._run_queue_entry(queue_entry, assigned_host)
1011 else:
1012 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001013
1014
1015 def _run_queue_entry(self, queue_entry, host):
showard77182562009-06-10 00:16:05 +00001016 agent = queue_entry.run_pre_job_tasks(assigned_host=host)
1017 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +00001018
1019
jadmanski0afbb632008-06-06 21:10:57 +00001020 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001021 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
1022 for agent in self.get_agents_for_entry(entry):
1023 agent.abort()
1024 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001025
1026
showard324bf812009-01-20 23:23:38 +00001027 def _can_start_agent(self, agent, num_started_this_cycle,
1028 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001029 # always allow zero-process agents to run
1030 if agent.num_processes == 0:
1031 return True
1032 # don't allow any nonzero-process agents to run after we've reached a
1033 # limit (this avoids starvation of many-process agents)
1034 if have_reached_limit:
1035 return False
1036 # total process throttling
showard324bf812009-01-20 23:23:38 +00001037 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +00001038 return False
1039 # if a single agent exceeds the per-cycle throttling, still allow it to
1040 # run when it's the first agent in the cycle
1041 if num_started_this_cycle == 0:
1042 return True
1043 # per-cycle throttling
1044 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +00001045 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001046 return False
1047 return True
1048
1049
jadmanski0afbb632008-06-06 21:10:57 +00001050 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001051 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001052 have_reached_limit = False
1053 # iterate over copy, so we can remove agents during iteration
1054 for agent in list(self._agents):
1055 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +00001056 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +00001057 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +00001058 continue
1059 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +00001060 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001061 have_reached_limit):
1062 have_reached_limit = True
1063 continue
showard4c5374f2008-09-04 17:02:56 +00001064 num_started_this_cycle += agent.num_processes
1065 agent.tick()
showarda9435c02009-05-13 21:28:17 +00001066 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001067 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001068
1069
showard29f7cd22009-04-29 21:16:24 +00001070 def _process_recurring_runs(self):
1071 recurring_runs = models.RecurringRun.objects.filter(
1072 start_date__lte=datetime.datetime.now())
1073 for rrun in recurring_runs:
1074 # Create job from template
1075 job = rrun.job
1076 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001077 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001078
1079 host_objects = info['hosts']
1080 one_time_hosts = info['one_time_hosts']
1081 metahost_objects = info['meta_hosts']
1082 dependencies = info['dependencies']
1083 atomic_group = info['atomic_group']
1084
1085 for host in one_time_hosts or []:
1086 this_host = models.Host.create_one_time_host(host.hostname)
1087 host_objects.append(this_host)
1088
1089 try:
1090 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001091 options=options,
showard29f7cd22009-04-29 21:16:24 +00001092 host_objects=host_objects,
1093 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001094 atomic_group=atomic_group)
1095
1096 except Exception, ex:
1097 logging.exception(ex)
1098 #TODO send email
1099
1100 if rrun.loop_count == 1:
1101 rrun.delete()
1102 else:
1103 if rrun.loop_count != 0: # if not infinite loop
1104 # calculate new start_date
1105 difference = datetime.timedelta(seconds=rrun.loop_period)
1106 rrun.start_date = rrun.start_date + difference
1107 rrun.loop_count -= 1
1108 rrun.save()
1109
1110
showard170873e2009-01-07 00:22:26 +00001111class PidfileRunMonitor(object):
1112 """
1113 Client must call either run() to start a new process or
1114 attach_to_existing_process().
1115 """
mbligh36768f02008-02-22 18:28:33 +00001116
showard170873e2009-01-07 00:22:26 +00001117 class _PidfileException(Exception):
1118 """
1119 Raised when there's some unexpected behavior with the pid file, but only
1120 used internally (never allowed to escape this class).
1121 """
mbligh36768f02008-02-22 18:28:33 +00001122
1123
showard170873e2009-01-07 00:22:26 +00001124 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001125 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001126 self._start_time = None
1127 self.pidfile_id = None
1128 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001129
1130
showard170873e2009-01-07 00:22:26 +00001131 def _add_nice_command(self, command, nice_level):
1132 if not nice_level:
1133 return command
1134 return ['nice', '-n', str(nice_level)] + command
1135
1136
1137 def _set_start_time(self):
1138 self._start_time = time.time()
1139
1140
1141 def run(self, command, working_directory, nice_level=None, log_file=None,
1142 pidfile_name=None, paired_with_pidfile=None):
1143 assert command is not None
1144 if nice_level is not None:
1145 command = ['nice', '-n', str(nice_level)] + command
1146 self._set_start_time()
1147 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001148 command, working_directory, pidfile_name=pidfile_name,
1149 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001150
1151
showardd3dc1992009-04-22 21:01:40 +00001152 def attach_to_existing_process(self, execution_tag,
1153 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001154 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001155 self.pidfile_id = _drone_manager.get_pidfile_id_from(
1156 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001157 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001158
1159
jadmanski0afbb632008-06-06 21:10:57 +00001160 def kill(self):
showard170873e2009-01-07 00:22:26 +00001161 if self.has_process():
1162 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001163
mbligh36768f02008-02-22 18:28:33 +00001164
showard170873e2009-01-07 00:22:26 +00001165 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001166 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001167 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001168
1169
showard170873e2009-01-07 00:22:26 +00001170 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001171 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001172 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001173 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001174
1175
showard170873e2009-01-07 00:22:26 +00001176 def _read_pidfile(self, use_second_read=False):
1177 assert self.pidfile_id is not None, (
1178 'You must call run() or attach_to_existing_process()')
1179 contents = _drone_manager.get_pidfile_contents(
1180 self.pidfile_id, use_second_read=use_second_read)
1181 if contents.is_invalid():
1182 self._state = drone_manager.PidfileContents()
1183 raise self._PidfileException(contents)
1184 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001185
1186
showard21baa452008-10-21 00:08:39 +00001187 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001188 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1189 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001190 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001191 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001192 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001193
1194
1195 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001196 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001197 return
mblighbb421852008-03-11 22:36:16 +00001198
showard21baa452008-10-21 00:08:39 +00001199 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001200
showard170873e2009-01-07 00:22:26 +00001201 if self._state.process is None:
1202 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001203 return
mbligh90a549d2008-03-25 23:52:34 +00001204
showard21baa452008-10-21 00:08:39 +00001205 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001206 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001207 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001208 return
mbligh90a549d2008-03-25 23:52:34 +00001209
showard170873e2009-01-07 00:22:26 +00001210 # pid but no running process - maybe process *just* exited
1211 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001212 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001213 # autoserv exited without writing an exit code
1214 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001215 self._handle_pidfile_error(
1216 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001217
showard21baa452008-10-21 00:08:39 +00001218
1219 def _get_pidfile_info(self):
1220 """\
1221 After completion, self._state will contain:
1222 pid=None, exit_status=None if autoserv has not yet run
1223 pid!=None, exit_status=None if autoserv is running
1224 pid!=None, exit_status!=None if autoserv has completed
1225 """
1226 try:
1227 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001228 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001229 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001230
1231
showard170873e2009-01-07 00:22:26 +00001232 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001233 """\
1234 Called when no pidfile is found or no pid is in the pidfile.
1235 """
showard170873e2009-01-07 00:22:26 +00001236 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001237 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001238 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1239 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001240 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001241 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001242
1243
showard35162b02009-03-03 02:17:30 +00001244 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001245 """\
1246 Called when autoserv has exited without writing an exit status,
1247 or we've timed out waiting for autoserv to write a pid to the
1248 pidfile. In either case, we just return failure and the caller
1249 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001250
showard170873e2009-01-07 00:22:26 +00001251 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001252 """
1253 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001254 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001255 self._state.exit_status = 1
1256 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001257
1258
jadmanski0afbb632008-06-06 21:10:57 +00001259 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001260 self._get_pidfile_info()
1261 return self._state.exit_status
1262
1263
1264 def num_tests_failed(self):
1265 self._get_pidfile_info()
1266 assert self._state.num_tests_failed is not None
1267 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001268
1269
mbligh36768f02008-02-22 18:28:33 +00001270class Agent(object):
showard77182562009-06-10 00:16:05 +00001271 """
1272 An agent for use by the Dispatcher class to perform a sequence of tasks.
1273
1274 The following methods are required on all task objects:
1275 poll() - Called periodically to let the task check its status and
1276 update its internal state. If the task succeeded.
1277 is_done() - Returns True if the task is finished.
1278 abort() - Called when an abort has been requested. The task must
1279 set its aborted attribute to True if it actually aborted.
1280
1281 The following attributes are required on all task objects:
1282 aborted - bool, True if this task was aborted.
1283 failure_tasks - A sequence of tasks to be run using a new Agent
1284 by the dispatcher should this task fail.
1285 success - bool, True if this task succeeded.
1286 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1287 host_ids - A sequence of Host ids this task represents.
1288
1289 The following attribute is written to all task objects:
1290 agent - A reference to the Agent instance that the task has been
1291 added to.
1292 """
1293
1294
showard170873e2009-01-07 00:22:26 +00001295 def __init__(self, tasks, num_processes=1):
showard77182562009-06-10 00:16:05 +00001296 """
1297 @param tasks: A list of tasks as described in the class docstring.
1298 @param num_processes: The number of subprocesses the Agent represents.
1299 This is used by the Dispatcher for managing the load on the
1300 system. Defaults to 1.
1301 """
jadmanski0afbb632008-06-06 21:10:57 +00001302 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001303 self.queue = None
showard77182562009-06-10 00:16:05 +00001304 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001305 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001306 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001307
showard170873e2009-01-07 00:22:26 +00001308 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1309 for task in tasks)
1310 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1311
showardd3dc1992009-04-22 21:01:40 +00001312 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001313 for task in tasks:
1314 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001315
1316
showardd3dc1992009-04-22 21:01:40 +00001317 def _clear_queue(self):
1318 self.queue = Queue.Queue(0)
1319
1320
showard170873e2009-01-07 00:22:26 +00001321 def _union_ids(self, id_lists):
1322 return set(itertools.chain(*id_lists))
1323
1324
jadmanski0afbb632008-06-06 21:10:57 +00001325 def add_task(self, task):
1326 self.queue.put_nowait(task)
1327 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001328
1329
jadmanski0afbb632008-06-06 21:10:57 +00001330 def tick(self):
showard21baa452008-10-21 00:08:39 +00001331 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001332 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001333 self.active_task.poll()
1334 if not self.active_task.is_done():
1335 return
1336 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001337
1338
jadmanski0afbb632008-06-06 21:10:57 +00001339 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001340 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001341 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001342 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001343 if not self.active_task.success:
1344 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001345 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001346
jadmanski0afbb632008-06-06 21:10:57 +00001347 if not self.is_done():
1348 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001349
1350
jadmanski0afbb632008-06-06 21:10:57 +00001351 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001352 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001353 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1354 # get reset.
1355 new_agent = Agent(self.active_task.failure_tasks)
1356 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001357
mblighe2586682008-02-29 22:45:46 +00001358
showard4c5374f2008-09-04 17:02:56 +00001359 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001360 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001361
1362
jadmanski0afbb632008-06-06 21:10:57 +00001363 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001364 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001365
1366
showardd3dc1992009-04-22 21:01:40 +00001367 def abort(self):
showard08a36412009-05-05 01:01:13 +00001368 # abort tasks until the queue is empty or a task ignores the abort
1369 while not self.is_done():
1370 if not self.active_task:
1371 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001372 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001373 if not self.active_task.aborted:
1374 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001375 return
1376 self.active_task = None
1377
showardd3dc1992009-04-22 21:01:40 +00001378
showard77182562009-06-10 00:16:05 +00001379class DelayedCallTask(object):
1380 """
1381 A task object like AgentTask for an Agent to run that waits for the
1382 specified amount of time to have elapsed before calling the supplied
1383 callback once and finishing. If the callback returns anything, it is
1384 assumed to be a new Agent instance and will be added to the dispatcher.
1385
1386 @attribute end_time: The absolute posix time after which this task will
1387 call its callback when it is polled and be finished.
1388
1389 Also has all attributes required by the Agent class.
1390 """
1391 def __init__(self, delay_seconds, callback, now_func=None):
1392 """
1393 @param delay_seconds: The delay in seconds from now that this task
1394 will call the supplied callback and be done.
1395 @param callback: A callable to be called by this task once after at
1396 least delay_seconds time has elapsed. It must return None
1397 or a new Agent instance.
1398 @param now_func: A time.time like function. Default: time.time.
1399 Used for testing.
1400 """
1401 assert delay_seconds > 0
1402 assert callable(callback)
1403 if not now_func:
1404 now_func = time.time
1405 self._now_func = now_func
1406 self._callback = callback
1407
1408 self.end_time = self._now_func() + delay_seconds
1409
1410 # These attributes are required by Agent.
1411 self.aborted = False
1412 self.failure_tasks = ()
1413 self.host_ids = ()
1414 self.success = False
1415 self.queue_entry_ids = ()
1416 # This is filled in by Agent.add_task().
1417 self.agent = None
1418
1419
1420 def poll(self):
1421 if self._callback and self._now_func() >= self.end_time:
1422 new_agent = self._callback()
1423 if new_agent:
1424 self.agent.dispatcher.add_agent(new_agent)
1425 self._callback = None
1426 self.success = True
1427
1428
1429 def is_done(self):
1430 return not self._callback
1431
1432
1433 def abort(self):
1434 self.aborted = True
1435 self._callback = None
1436
1437
mbligh36768f02008-02-22 18:28:33 +00001438class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001439 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1440 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001441 self.done = False
1442 self.failure_tasks = failure_tasks
1443 self.started = False
1444 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001445 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001446 self.task = None
1447 self.agent = None
1448 self.monitor = None
1449 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001450 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001451 self.queue_entry_ids = []
1452 self.host_ids = []
1453 self.log_file = None
1454
1455
1456 def _set_ids(self, host=None, queue_entries=None):
1457 if queue_entries and queue_entries != [None]:
1458 self.host_ids = [entry.host.id for entry in queue_entries]
1459 self.queue_entry_ids = [entry.id for entry in queue_entries]
1460 else:
1461 assert host
1462 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001463
1464
jadmanski0afbb632008-06-06 21:10:57 +00001465 def poll(self):
showard08a36412009-05-05 01:01:13 +00001466 if not self.started:
1467 self.start()
1468 self.tick()
1469
1470
1471 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001472 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001473 exit_code = self.monitor.exit_code()
1474 if exit_code is None:
1475 return
1476 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001477 else:
1478 success = False
mbligh36768f02008-02-22 18:28:33 +00001479
jadmanski0afbb632008-06-06 21:10:57 +00001480 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001481
1482
jadmanski0afbb632008-06-06 21:10:57 +00001483 def is_done(self):
1484 return self.done
mbligh36768f02008-02-22 18:28:33 +00001485
1486
jadmanski0afbb632008-06-06 21:10:57 +00001487 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001488 if self.done:
1489 return
jadmanski0afbb632008-06-06 21:10:57 +00001490 self.done = True
1491 self.success = success
1492 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001493
1494
jadmanski0afbb632008-06-06 21:10:57 +00001495 def prolog(self):
1496 pass
mblighd64e5702008-04-04 21:39:28 +00001497
1498
jadmanski0afbb632008-06-06 21:10:57 +00001499 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001500 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001501
mbligh36768f02008-02-22 18:28:33 +00001502
jadmanski0afbb632008-06-06 21:10:57 +00001503 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001504 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001505 _drone_manager.copy_to_results_repository(
1506 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001507
1508
jadmanski0afbb632008-06-06 21:10:57 +00001509 def epilog(self):
1510 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001511
1512
jadmanski0afbb632008-06-06 21:10:57 +00001513 def start(self):
1514 assert self.agent
1515
1516 if not self.started:
1517 self.prolog()
1518 self.run()
1519
1520 self.started = True
1521
1522
1523 def abort(self):
1524 if self.monitor:
1525 self.monitor.kill()
1526 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001527 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001528 self.cleanup()
1529
1530
showard170873e2009-01-07 00:22:26 +00001531 def set_host_log_file(self, base_name, host):
1532 filename = '%s.%s' % (time.time(), base_name)
1533 self.log_file = os.path.join('hosts', host.hostname, filename)
1534
1535
showardde634ee2009-01-30 01:44:24 +00001536 def _get_consistent_execution_tag(self, queue_entries):
1537 first_execution_tag = queue_entries[0].execution_tag()
1538 for queue_entry in queue_entries[1:]:
1539 assert queue_entry.execution_tag() == first_execution_tag, (
1540 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1541 queue_entry,
1542 first_execution_tag,
1543 queue_entries[0]))
1544 return first_execution_tag
1545
1546
showarda1e74b32009-05-12 17:32:04 +00001547 def _copy_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001548 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001549 if use_monitor is None:
1550 assert self.monitor
1551 use_monitor = self.monitor
1552 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001553 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001554 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001555 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001556 results_path)
showardde634ee2009-01-30 01:44:24 +00001557
showarda1e74b32009-05-12 17:32:04 +00001558
1559 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001560 reparse_task = FinalReparseTask(queue_entries)
1561 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1562
1563
showarda1e74b32009-05-12 17:32:04 +00001564 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1565 self._copy_results(queue_entries, use_monitor)
1566 self._parse_results(queue_entries)
1567
1568
showardd3dc1992009-04-22 21:01:40 +00001569 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001570 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001571 self.monitor = PidfileRunMonitor()
1572 self.monitor.run(self.cmd, self._working_directory,
1573 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001574 log_file=self.log_file,
1575 pidfile_name=pidfile_name,
1576 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001577
1578
showardd9205182009-04-27 20:09:55 +00001579class TaskWithJobKeyvals(object):
1580 """AgentTask mixin providing functionality to help with job keyval files."""
1581 _KEYVAL_FILE = 'keyval'
1582 def _format_keyval(self, key, value):
1583 return '%s=%s' % (key, value)
1584
1585
1586 def _keyval_path(self):
1587 """Subclasses must override this"""
1588 raise NotImplemented
1589
1590
1591 def _write_keyval_after_job(self, field, value):
1592 assert self.monitor
1593 if not self.monitor.has_process():
1594 return
1595 _drone_manager.write_lines_to_file(
1596 self._keyval_path(), [self._format_keyval(field, value)],
1597 paired_with_process=self.monitor.get_process())
1598
1599
1600 def _job_queued_keyval(self, job):
1601 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1602
1603
1604 def _write_job_finished(self):
1605 self._write_keyval_after_job("job_finished", int(time.time()))
1606
1607
1608class RepairTask(AgentTask, TaskWithJobKeyvals):
showard2fe3f1d2009-07-06 20:19:11 +00001609 def __init__(self, host, queue_entry=None, task=None):
jadmanski0afbb632008-06-06 21:10:57 +00001610 """\
showard170873e2009-01-07 00:22:26 +00001611 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001612 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001613 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001614 # normalize the protection name
1615 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001616
jadmanski0afbb632008-06-06 21:10:57 +00001617 self.host = host
showard2fe3f1d2009-07-06 20:19:11 +00001618 self.queue_entry = queue_entry
showard170873e2009-01-07 00:22:26 +00001619
1620 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001621 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1622 ['-R', '--host-protection', protection],
1623 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001624 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1625
showard2fe3f1d2009-07-06 20:19:11 +00001626 # *don't* include the queue entry in IDs -- if the queue entry is
1627 # aborted, we want to leave the repair task running
1628 self._set_ids(host=host)
1629
showard170873e2009-01-07 00:22:26 +00001630 self.set_host_log_file('repair', self.host)
showard2fe3f1d2009-07-06 20:19:11 +00001631 self._task = task
mblighe2586682008-02-29 22:45:46 +00001632
mbligh36768f02008-02-22 18:28:33 +00001633
jadmanski0afbb632008-06-06 21:10:57 +00001634 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001635 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001636 self.host.set_status('Repairing')
showard2fe3f1d2009-07-06 20:19:11 +00001637 if self.queue_entry:
1638 self.queue_entry.requeue()
1639
1640 self.task_type = models.SpecialTask.Task.REPAIR
1641 self._task = models.SpecialTask.prepare(self, self._task)
mbligh36768f02008-02-22 18:28:33 +00001642
1643
showardd9205182009-04-27 20:09:55 +00001644 def _keyval_path(self):
1645 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1646
1647
showardde634ee2009-01-30 01:44:24 +00001648 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001649 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001650
showard2fe3f1d2009-07-06 20:19:11 +00001651 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001652 return # don't fail metahost entries, they'll be reassigned
1653
showard2fe3f1d2009-07-06 20:19:11 +00001654 self.queue_entry.update_from_database()
1655 if self.queue_entry.status != 'Queued':
showardccbd6c52009-03-21 00:10:21 +00001656 return # entry has been aborted
1657
showard2fe3f1d2009-07-06 20:19:11 +00001658 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001659 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001660 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001661 self._write_keyval_after_job(queued_key, queued_time)
1662 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001663 # copy results logs into the normal place for job results
1664 _drone_manager.copy_results_on_drone(
1665 self.monitor.get_process(),
1666 source_path=self.temp_results_dir + '/',
showard2fe3f1d2009-07-06 20:19:11 +00001667 destination_path=self.queue_entry.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001668
showard2fe3f1d2009-07-06 20:19:11 +00001669 self._copy_results([self.queue_entry])
1670 if self.queue_entry.job.parse_failed_repair:
1671 self._parse_results([self.queue_entry])
1672 self.queue_entry.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001673
1674
jadmanski0afbb632008-06-06 21:10:57 +00001675 def epilog(self):
1676 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001677
showard2fe3f1d2009-07-06 20:19:11 +00001678 self._task.finish()
showard6d7b2ff2009-06-10 00:16:47 +00001679
jadmanski0afbb632008-06-06 21:10:57 +00001680 if self.success:
1681 self.host.set_status('Ready')
1682 else:
1683 self.host.set_status('Repair Failed')
showard2fe3f1d2009-07-06 20:19:11 +00001684 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001685 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001686
1687
showard8fe93b52008-11-18 17:53:22 +00001688class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001689 def epilog(self):
1690 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001691 should_copy_results = (self.queue_entry and not self.success
1692 and not self.queue_entry.meta_host)
1693 if should_copy_results:
1694 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001695 destination = os.path.join(self.queue_entry.execution_tag(),
1696 os.path.basename(self.log_file))
1697 _drone_manager.copy_to_results_repository(
1698 self.monitor.get_process(), self.log_file,
1699 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001700
1701
1702class VerifyTask(PreJobTask):
showard2fe3f1d2009-07-06 20:19:11 +00001703 def __init__(self, queue_entry=None, host=None, task=None):
jadmanski0afbb632008-06-06 21:10:57 +00001704 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001705 self.host = host or queue_entry.host
1706 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001707
jadmanski0afbb632008-06-06 21:10:57 +00001708 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001709 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1710 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001711 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001712 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1713 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001714
showard170873e2009-01-07 00:22:26 +00001715 self.set_host_log_file('verify', self.host)
1716 self._set_ids(host=host, queue_entries=[queue_entry])
showard2fe3f1d2009-07-06 20:19:11 +00001717 self._task = task
mblighe2586682008-02-29 22:45:46 +00001718
1719
jadmanski0afbb632008-06-06 21:10:57 +00001720 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001721 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001722 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001723 if self.queue_entry:
1724 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001725 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001726
showard2fe3f1d2009-07-06 20:19:11 +00001727 self.task_type = models.SpecialTask.Task.VERIFY
1728
1729 # Prepare all SpecialTasks associated with this verify.
1730 # Include "active" verify tasks for recovery; we want the new log file
1731 # and started time to be recorded
1732 self._tasks = list(models.SpecialTask.objects.filter(
1733 host__id=self.host.id,
1734 task=models.SpecialTask.Task.VERIFY,
1735 is_complete=False,
1736 queue_entry__isnull=True))
1737 task_not_included = (
1738 not self._task
1739 or self._task.id not in [task.id for task in self._tasks])
1740 if task_not_included:
1741 self._tasks.append(self._task)
1742 for i in range(len(self._tasks)):
1743 self._tasks[i] = models.SpecialTask.prepare(self, self._tasks[i])
1744
mbligh36768f02008-02-22 18:28:33 +00001745
jadmanski0afbb632008-06-06 21:10:57 +00001746 def epilog(self):
1747 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001748
showard2fe3f1d2009-07-06 20:19:11 +00001749 for task in self._tasks:
1750 task.finish()
showard6d7b2ff2009-06-10 00:16:47 +00001751
showard2fe3f1d2009-07-06 20:19:11 +00001752 if self.success:
jadmanski0afbb632008-06-06 21:10:57 +00001753 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001754
1755
showardb5626452009-06-30 01:57:28 +00001756class CleanupHostsMixin(object):
1757 def _reboot_hosts(self, job, queue_entries, final_success,
1758 num_tests_failed):
1759 reboot_after = job.reboot_after
1760 do_reboot = (
1761 # always reboot after aborted jobs
1762 self._final_status == models.HostQueueEntry.Status.ABORTED
1763 or reboot_after == models.RebootAfter.ALWAYS
1764 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
1765 and final_success and num_tests_failed == 0))
1766
1767 for queue_entry in queue_entries:
1768 if do_reboot:
1769 # don't pass the queue entry to the CleanupTask. if the cleanup
1770 # fails, the job doesn't care -- it's over.
1771 cleanup_task = CleanupTask(host=queue_entry.host)
1772 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1773 else:
1774 queue_entry.host.set_status('Ready')
1775
1776
1777class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
showardf1ae3542009-05-11 19:26:02 +00001778 def __init__(self, job, queue_entries, cmd, group_name=''):
jadmanski0afbb632008-06-06 21:10:57 +00001779 self.job = job
1780 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001781 self.group_name = group_name
showard170873e2009-01-07 00:22:26 +00001782 super(QueueTask, self).__init__(cmd, self._execution_tag())
1783 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001784
1785
showard73ec0442009-02-07 02:05:20 +00001786 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001787 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001788
1789
1790 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1791 keyval_contents = '\n'.join(self._format_keyval(key, value)
1792 for key, value in keyval_dict.iteritems())
1793 # always end with a newline to allow additional keyvals to be written
1794 keyval_contents += '\n'
1795 _drone_manager.attach_file_to_execution(self._execution_tag(),
1796 keyval_contents,
1797 file_path=keyval_path)
1798
1799
1800 def _write_keyvals_before_job(self, keyval_dict):
1801 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1802
1803
showard170873e2009-01-07 00:22:26 +00001804 def _write_host_keyvals(self, host):
1805 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1806 host.hostname)
1807 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001808 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1809 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001810
1811
showard170873e2009-01-07 00:22:26 +00001812 def _execution_tag(self):
1813 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001814
1815
jadmanski0afbb632008-06-06 21:10:57 +00001816 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001817 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001818 keyval_dict = {queued_key: queued_time}
1819 if self.group_name:
1820 keyval_dict['host_group_name'] = self.group_name
1821 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001822 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001823 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001824 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001825 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001826 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001827 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001828 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001829 assert len(self.queue_entries) == 1
1830 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001831
1832
showard35162b02009-03-03 02:17:30 +00001833 def _write_lost_process_error_file(self):
1834 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1835 _drone_manager.write_lines_to_file(error_file_path,
1836 [_LOST_PROCESS_ERROR])
1837
1838
showardd3dc1992009-04-22 21:01:40 +00001839 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001840 if not self.monitor:
1841 return
1842
showardd9205182009-04-27 20:09:55 +00001843 self._write_job_finished()
1844
showardd3dc1992009-04-22 21:01:40 +00001845 # both of these conditionals can be true, iff the process ran, wrote a
1846 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001847 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001848 gather_task = GatherLogsTask(self.job, self.queue_entries)
1849 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showardb5626452009-06-30 01:57:28 +00001850 else:
1851 self._reboot_hosts(self.job, self.queue_entries,
1852 final_success=False, num_tests_failed=0)
showard35162b02009-03-03 02:17:30 +00001853
1854 if self.monitor.lost_process:
1855 self._write_lost_process_error_file()
1856 for queue_entry in self.queue_entries:
1857 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001858
1859
showardcbd74612008-11-19 21:42:02 +00001860 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001861 _drone_manager.write_lines_to_file(
1862 os.path.join(self._execution_tag(), 'status.log'),
1863 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001864 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001865
1866
jadmanskif7fa2cc2008-10-01 14:13:23 +00001867 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001868 if not self.monitor or not self.monitor.has_process():
1869 return
1870
jadmanskif7fa2cc2008-10-01 14:13:23 +00001871 # build up sets of all the aborted_by and aborted_on values
1872 aborted_by, aborted_on = set(), set()
1873 for queue_entry in self.queue_entries:
1874 if queue_entry.aborted_by:
1875 aborted_by.add(queue_entry.aborted_by)
1876 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1877 aborted_on.add(t)
1878
1879 # extract some actual, unique aborted by value and write it out
1880 assert len(aborted_by) <= 1
1881 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001882 aborted_by_value = aborted_by.pop()
1883 aborted_on_value = max(aborted_on)
1884 else:
1885 aborted_by_value = 'autotest_system'
1886 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001887
showarda0382352009-02-11 23:36:43 +00001888 self._write_keyval_after_job("aborted_by", aborted_by_value)
1889 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001890
showardcbd74612008-11-19 21:42:02 +00001891 aborted_on_string = str(datetime.datetime.fromtimestamp(
1892 aborted_on_value))
1893 self._write_status_comment('Job aborted by %s on %s' %
1894 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001895
1896
jadmanski0afbb632008-06-06 21:10:57 +00001897 def abort(self):
1898 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001899 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001900 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001901
1902
jadmanski0afbb632008-06-06 21:10:57 +00001903 def epilog(self):
1904 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001905 self._finish_task()
1906 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001907
1908
mblighbb421852008-03-11 22:36:16 +00001909class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001910 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001911 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
showard5add1c82009-05-26 19:27:46 +00001912 self.monitor = run_monitor
1913 self.started = True
1914 # since we set started=True here, prolog() and run() shouldn't be called
mblighbb421852008-03-11 22:36:16 +00001915
1916
jadmanski0afbb632008-06-06 21:10:57 +00001917 def run(self):
showard5add1c82009-05-26 19:27:46 +00001918 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001919
1920
jadmanski0afbb632008-06-06 21:10:57 +00001921 def prolog(self):
showard5add1c82009-05-26 19:27:46 +00001922 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001923
1924
showardd3dc1992009-04-22 21:01:40 +00001925class PostJobTask(AgentTask):
1926 def __init__(self, queue_entries, pidfile_name, logfile_name,
1927 run_monitor=None):
1928 """
1929 If run_monitor != None, we're recovering a running task.
1930 """
1931 self._queue_entries = queue_entries
1932 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00001933
1934 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1935 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1936 self._autoserv_monitor = PidfileRunMonitor()
1937 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1938 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1939
1940 if _testing_mode:
1941 command = 'true'
1942 else:
1943 command = self._generate_command(self._results_dir)
1944
1945 super(PostJobTask, self).__init__(cmd=command,
1946 working_directory=self._execution_tag)
showard5add1c82009-05-26 19:27:46 +00001947 # this must happen *after* the super call
1948 self.monitor = run_monitor
1949 if run_monitor:
1950 self.started = True
showardd3dc1992009-04-22 21:01:40 +00001951
1952 self.log_file = os.path.join(self._execution_tag, logfile_name)
1953 self._final_status = self._determine_final_status()
1954
1955
1956 def _generate_command(self, results_dir):
1957 raise NotImplementedError('Subclasses must override this')
1958
1959
1960 def _job_was_aborted(self):
1961 was_aborted = None
1962 for queue_entry in self._queue_entries:
1963 queue_entry.update_from_database()
1964 if was_aborted is None: # first queue entry
1965 was_aborted = bool(queue_entry.aborted)
1966 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1967 email_manager.manager.enqueue_notify_email(
1968 'Inconsistent abort state',
1969 'Queue entries have inconsistent abort state: ' +
1970 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1971 # don't crash here, just assume true
1972 return True
1973 return was_aborted
1974
1975
1976 def _determine_final_status(self):
1977 if self._job_was_aborted():
1978 return models.HostQueueEntry.Status.ABORTED
1979
1980 # we'll use a PidfileRunMonitor to read the autoserv exit status
1981 if self._autoserv_monitor.exit_code() == 0:
1982 return models.HostQueueEntry.Status.COMPLETED
1983 return models.HostQueueEntry.Status.FAILED
1984
1985
1986 def run(self):
showard5add1c82009-05-26 19:27:46 +00001987 assert not self.monitor
showardd3dc1992009-04-22 21:01:40 +00001988
showard5add1c82009-05-26 19:27:46 +00001989 # make sure we actually have results to work with.
1990 # this should never happen in normal operation.
1991 if not self._autoserv_monitor.has_process():
1992 email_manager.manager.enqueue_notify_email(
1993 'No results in post-job task',
1994 'No results in post-job task at %s' %
1995 self._autoserv_monitor.pidfile_id)
1996 self.finished(False)
1997 return
1998
1999 super(PostJobTask, self).run(
2000 pidfile_name=self._pidfile_name,
2001 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00002002
2003
2004 def _set_all_statuses(self, status):
2005 for queue_entry in self._queue_entries:
2006 queue_entry.set_status(status)
2007
2008
2009 def abort(self):
2010 # override AgentTask.abort() to avoid killing the process and ending
2011 # the task. post-job tasks continue when the job is aborted.
2012 pass
2013
2014
showardb5626452009-06-30 01:57:28 +00002015class GatherLogsTask(PostJobTask, CleanupHostsMixin):
showardd3dc1992009-04-22 21:01:40 +00002016 """
2017 Task responsible for
2018 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2019 * copying logs to the results repository
2020 * spawning CleanupTasks for hosts, if necessary
2021 * spawning a FinalReparseTask for the job
2022 """
2023 def __init__(self, job, queue_entries, run_monitor=None):
2024 self._job = job
2025 super(GatherLogsTask, self).__init__(
2026 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
2027 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
2028 self._set_ids(queue_entries=queue_entries)
2029
2030
2031 def _generate_command(self, results_dir):
2032 host_list = ','.join(queue_entry.host.hostname
2033 for queue_entry in self._queue_entries)
2034 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2035 '-r', results_dir]
2036
2037
2038 def prolog(self):
2039 super(GatherLogsTask, self).prolog()
2040 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
2041
2042
showardd3dc1992009-04-22 21:01:40 +00002043 def epilog(self):
2044 super(GatherLogsTask, self).epilog()
showardebc0fb72009-05-13 21:28:07 +00002045 if self._autoserv_monitor.has_process():
2046 self._copy_and_parse_results(self._queue_entries,
2047 use_monitor=self._autoserv_monitor)
showardb5626452009-06-30 01:57:28 +00002048
2049 final_success = (
2050 self._final_status == models.HostQueueEntry.Status.COMPLETED)
2051 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2052 self._reboot_hosts(self._job, self._queue_entries, final_success,
2053 num_tests_failed)
showardd3dc1992009-04-22 21:01:40 +00002054
2055
showard0bbfc212009-04-29 21:06:13 +00002056 def run(self):
showard597bfd32009-05-08 18:22:50 +00002057 autoserv_exit_code = self._autoserv_monitor.exit_code()
2058 # only run if Autoserv exited due to some signal. if we have no exit
2059 # code, assume something bad (and signal-like) happened.
2060 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002061 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002062 else:
2063 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002064
2065
showard8fe93b52008-11-18 17:53:22 +00002066class CleanupTask(PreJobTask):
showard2fe3f1d2009-07-06 20:19:11 +00002067 def __init__(self, host=None, queue_entry=None, task=None):
showardfa8629c2008-11-04 16:51:23 +00002068 assert bool(host) ^ bool(queue_entry)
2069 if queue_entry:
2070 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00002071 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00002072 self.host = host
showard170873e2009-01-07 00:22:26 +00002073
2074 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00002075 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
2076 ['--cleanup'],
2077 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00002078 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00002079 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
2080 failure_tasks=[repair_task])
2081
2082 self._set_ids(host=host, queue_entries=[queue_entry])
2083 self.set_host_log_file('cleanup', self.host)
showard2fe3f1d2009-07-06 20:19:11 +00002084 self._task = task
mbligh16c722d2008-03-05 00:58:44 +00002085
mblighd5c95802008-03-05 00:33:46 +00002086
jadmanski0afbb632008-06-06 21:10:57 +00002087 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002088 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002089 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00002090 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00002091
showard2fe3f1d2009-07-06 20:19:11 +00002092 self.task_type = models.SpecialTask.Task.CLEANUP
2093 self._task = models.SpecialTask.prepare(self, self._task)
2094
mblighd5c95802008-03-05 00:33:46 +00002095
showard21baa452008-10-21 00:08:39 +00002096 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002097 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002098
2099 self._task.finish()
2100
showard21baa452008-10-21 00:08:39 +00002101 if self.success:
showardfa8629c2008-11-04 16:51:23 +00002102 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00002103 self.host.update_field('dirty', 0)
2104
2105
showardd3dc1992009-04-22 21:01:40 +00002106class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002107 _num_running_parses = 0
2108
showardd3dc1992009-04-22 21:01:40 +00002109 def __init__(self, queue_entries, run_monitor=None):
2110 super(FinalReparseTask, self).__init__(queue_entries,
2111 pidfile_name=_PARSER_PID_FILE,
2112 logfile_name='.parse.log',
2113 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00002114 # don't use _set_ids, since we don't want to set the host_ids
2115 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard5add1c82009-05-26 19:27:46 +00002116 self._parse_started = (run_monitor is not None)
showard97aed502008-11-04 02:01:24 +00002117
showard97aed502008-11-04 02:01:24 +00002118
2119 @classmethod
2120 def _increment_running_parses(cls):
2121 cls._num_running_parses += 1
2122
2123
2124 @classmethod
2125 def _decrement_running_parses(cls):
2126 cls._num_running_parses -= 1
2127
2128
2129 @classmethod
2130 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002131 return (cls._num_running_parses <
2132 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002133
2134
2135 def prolog(self):
2136 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002137 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00002138
2139
2140 def epilog(self):
2141 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002142 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002143
2144
showardd3dc1992009-04-22 21:01:40 +00002145 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002146 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002147 results_dir]
showard97aed502008-11-04 02:01:24 +00002148
2149
showard08a36412009-05-05 01:01:13 +00002150 def tick(self):
2151 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002152 # and we can, at which point we revert to default behavior
2153 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002154 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002155 else:
2156 self._try_starting_parse()
2157
2158
2159 def run(self):
2160 # override run() to not actually run unless we can
2161 self._try_starting_parse()
2162
2163
2164 def _try_starting_parse(self):
2165 if not self._can_run_new_parse():
2166 return
showard170873e2009-01-07 00:22:26 +00002167
showard97aed502008-11-04 02:01:24 +00002168 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002169 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002170
showard97aed502008-11-04 02:01:24 +00002171 self._increment_running_parses()
2172 self._parse_started = True
2173
2174
2175 def finished(self, success):
2176 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002177 if self._parse_started:
2178 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002179
2180
showardc9ae1782009-01-30 01:42:37 +00002181class SetEntryPendingTask(AgentTask):
2182 def __init__(self, queue_entry):
2183 super(SetEntryPendingTask, self).__init__(cmd='')
2184 self._queue_entry = queue_entry
2185 self._set_ids(queue_entries=[queue_entry])
2186
2187
2188 def run(self):
2189 agent = self._queue_entry.on_pending()
2190 if agent:
2191 self.agent.dispatcher.add_agent(agent)
2192 self.finished(True)
2193
2194
showarda3c58572009-03-12 20:36:59 +00002195class DBError(Exception):
2196 """Raised by the DBObject constructor when its select fails."""
2197
2198
mbligh36768f02008-02-22 18:28:33 +00002199class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002200 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002201
2202 # Subclasses MUST override these:
2203 _table_name = ''
2204 _fields = ()
2205
showarda3c58572009-03-12 20:36:59 +00002206 # A mapping from (type, id) to the instance of the object for that
2207 # particular id. This prevents us from creating new Job() and Host()
2208 # instances for every HostQueueEntry object that we instantiate as
2209 # multiple HQEs often share the same Job.
2210 _instances_by_type_and_id = weakref.WeakValueDictionary()
2211 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002212
showarda3c58572009-03-12 20:36:59 +00002213
2214 def __new__(cls, id=None, **kwargs):
2215 """
2216 Look to see if we already have an instance for this particular type
2217 and id. If so, use it instead of creating a duplicate instance.
2218 """
2219 if id is not None:
2220 instance = cls._instances_by_type_and_id.get((cls, id))
2221 if instance:
2222 return instance
2223 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2224
2225
2226 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00002227 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00002228 assert self._table_name, '_table_name must be defined in your class'
2229 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002230 if not new_record:
2231 if self._initialized and not always_query:
2232 return # We've already been initialized.
2233 if id is None:
2234 id = row[0]
2235 # Tell future constructors to use us instead of re-querying while
2236 # this instance is still around.
2237 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002238
showard6ae5ea92009-02-25 00:11:51 +00002239 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002240
jadmanski0afbb632008-06-06 21:10:57 +00002241 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002242
jadmanski0afbb632008-06-06 21:10:57 +00002243 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002244 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002245
showarda3c58572009-03-12 20:36:59 +00002246 if self._initialized:
2247 differences = self._compare_fields_in_row(row)
2248 if differences:
showard7629f142009-03-27 21:02:02 +00002249 logging.warn(
2250 'initialized %s %s instance requery is updating: %s',
2251 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002252 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002253 self._initialized = True
2254
2255
2256 @classmethod
2257 def _clear_instance_cache(cls):
2258 """Used for testing, clear the internal instance cache."""
2259 cls._instances_by_type_and_id.clear()
2260
2261
showardccbd6c52009-03-21 00:10:21 +00002262 def _fetch_row_from_db(self, row_id):
2263 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2264 rows = _db.execute(sql, (row_id,))
2265 if not rows:
showard76e29d12009-04-15 21:53:10 +00002266 raise DBError("row not found (table=%s, row id=%s)"
2267 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002268 return rows[0]
2269
2270
showarda3c58572009-03-12 20:36:59 +00002271 def _assert_row_length(self, row):
2272 assert len(row) == len(self._fields), (
2273 "table = %s, row = %s/%d, fields = %s/%d" % (
2274 self.__table, row, len(row), self._fields, len(self._fields)))
2275
2276
2277 def _compare_fields_in_row(self, row):
2278 """
2279 Given a row as returned by a SELECT query, compare it to our existing
2280 in memory fields.
2281
2282 @param row - A sequence of values corresponding to fields named in
2283 The class attribute _fields.
2284
2285 @returns A dictionary listing the differences keyed by field name
2286 containing tuples of (current_value, row_value).
2287 """
2288 self._assert_row_length(row)
2289 differences = {}
2290 for field, row_value in itertools.izip(self._fields, row):
2291 current_value = getattr(self, field)
2292 if current_value != row_value:
2293 differences[field] = (current_value, row_value)
2294 return differences
showard2bab8f42008-11-12 18:15:22 +00002295
2296
2297 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002298 """
2299 Update our field attributes using a single row returned by SELECT.
2300
2301 @param row - A sequence of values corresponding to fields named in
2302 the class fields list.
2303 """
2304 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002305
showard2bab8f42008-11-12 18:15:22 +00002306 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002307 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002308 setattr(self, field, value)
2309 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002310
showard2bab8f42008-11-12 18:15:22 +00002311 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002312
mblighe2586682008-02-29 22:45:46 +00002313
showardccbd6c52009-03-21 00:10:21 +00002314 def update_from_database(self):
2315 assert self.id is not None
2316 row = self._fetch_row_from_db(self.id)
2317 self._update_fields_from_row(row)
2318
2319
jadmanski0afbb632008-06-06 21:10:57 +00002320 def count(self, where, table = None):
2321 if not table:
2322 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002323
jadmanski0afbb632008-06-06 21:10:57 +00002324 rows = _db.execute("""
2325 SELECT count(*) FROM %s
2326 WHERE %s
2327 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002328
jadmanski0afbb632008-06-06 21:10:57 +00002329 assert len(rows) == 1
2330
2331 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002332
2333
showardd3dc1992009-04-22 21:01:40 +00002334 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002335 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002336
showard2bab8f42008-11-12 18:15:22 +00002337 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002338 return
mbligh36768f02008-02-22 18:28:33 +00002339
mblighf8c624d2008-07-03 16:58:45 +00002340 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002341 _db.execute(query, (value, self.id))
2342
showard2bab8f42008-11-12 18:15:22 +00002343 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002344
2345
jadmanski0afbb632008-06-06 21:10:57 +00002346 def save(self):
2347 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002348 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002349 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002350 values = []
2351 for key in keys:
2352 value = getattr(self, key)
2353 if value is None:
2354 values.append('NULL')
2355 else:
2356 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002357 values_str = ','.join(values)
2358 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2359 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002360 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002361 # Update our id to the one the database just assigned to us.
2362 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002363
2364
jadmanski0afbb632008-06-06 21:10:57 +00002365 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002366 self._instances_by_type_and_id.pop((type(self), id), None)
2367 self._initialized = False
2368 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002369 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2370 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002371
2372
showard63a34772008-08-18 19:32:50 +00002373 @staticmethod
2374 def _prefix_with(string, prefix):
2375 if string:
2376 string = prefix + string
2377 return string
2378
2379
jadmanski0afbb632008-06-06 21:10:57 +00002380 @classmethod
showard989f25d2008-10-01 11:38:11 +00002381 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002382 """
2383 Construct instances of our class based on the given database query.
2384
2385 @yields One class instance for each row fetched.
2386 """
showard63a34772008-08-18 19:32:50 +00002387 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2388 where = cls._prefix_with(where, 'WHERE ')
2389 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002390 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002391 'joins' : joins,
2392 'where' : where,
2393 'order_by' : order_by})
2394 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002395 for row in rows:
2396 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002397
mbligh36768f02008-02-22 18:28:33 +00002398
2399class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002400 _table_name = 'ineligible_host_queues'
2401 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002402
2403
showard89f84db2009-03-12 20:39:13 +00002404class AtomicGroup(DBObject):
2405 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002406 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2407 'invalid')
showard89f84db2009-03-12 20:39:13 +00002408
2409
showard989f25d2008-10-01 11:38:11 +00002410class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002411 _table_name = 'labels'
2412 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002413 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002414
2415
showard6157c632009-07-06 20:19:31 +00002416 def __repr__(self):
2417 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2418 self.name, self.id, self.atomic_group_id)
2419
2420
mbligh36768f02008-02-22 18:28:33 +00002421class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002422 _table_name = 'hosts'
2423 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2424 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2425
2426
jadmanski0afbb632008-06-06 21:10:57 +00002427 def current_task(self):
2428 rows = _db.execute("""
2429 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2430 """, (self.id,))
2431
2432 if len(rows) == 0:
2433 return None
2434 else:
2435 assert len(rows) == 1
2436 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002437 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002438
2439
jadmanski0afbb632008-06-06 21:10:57 +00002440 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002441 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002442 if self.current_task():
2443 self.current_task().requeue()
2444
showard6ae5ea92009-02-25 00:11:51 +00002445
jadmanski0afbb632008-06-06 21:10:57 +00002446 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002447 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002448 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002449
2450
showard170873e2009-01-07 00:22:26 +00002451 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002452 """
showard170873e2009-01-07 00:22:26 +00002453 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002454 """
2455 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002456 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002457 FROM labels
2458 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002459 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002460 ORDER BY labels.name
2461 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002462 platform = None
2463 all_labels = []
2464 for label_name, is_platform in rows:
2465 if is_platform:
2466 platform = label_name
2467 all_labels.append(label_name)
2468 return platform, all_labels
2469
2470
showard2fe3f1d2009-07-06 20:19:11 +00002471 def reverify_tasks(self):
2472 cleanup_task = CleanupTask(host=self)
2473 verify_task = VerifyTask(host=self)
2474
showard6d7b2ff2009-06-10 00:16:47 +00002475 # just to make sure this host does not get taken away
showard2fe3f1d2009-07-06 20:19:11 +00002476 self.set_status('Cleaning')
2477 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002478
2479
showard54c1ea92009-05-20 00:32:58 +00002480 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2481
2482
2483 @classmethod
2484 def cmp_for_sort(cls, a, b):
2485 """
2486 A comparison function for sorting Host objects by hostname.
2487
2488 This strips any trailing numeric digits, ignores leading 0s and
2489 compares hostnames by the leading name and the trailing digits as a
2490 number. If both hostnames do not match this pattern, they are simply
2491 compared as lower case strings.
2492
2493 Example of how hostnames will be sorted:
2494
2495 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2496
2497 This hopefully satisfy most people's hostname sorting needs regardless
2498 of their exact naming schemes. Nobody sane should have both a host10
2499 and host010 (but the algorithm works regardless).
2500 """
2501 lower_a = a.hostname.lower()
2502 lower_b = b.hostname.lower()
2503 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2504 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2505 if match_a and match_b:
2506 name_a, number_a_str = match_a.groups()
2507 name_b, number_b_str = match_b.groups()
2508 number_a = int(number_a_str.lstrip('0'))
2509 number_b = int(number_b_str.lstrip('0'))
2510 result = cmp((name_a, number_a), (name_b, number_b))
2511 if result == 0 and lower_a != lower_b:
2512 # If they compared equal above but the lower case names are
2513 # indeed different, don't report equality. abc012 != abc12.
2514 return cmp(lower_a, lower_b)
2515 return result
2516 else:
2517 return cmp(lower_a, lower_b)
2518
2519
mbligh36768f02008-02-22 18:28:33 +00002520class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002521 _table_name = 'host_queue_entries'
2522 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002523 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002524 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002525
2526
showarda3c58572009-03-12 20:36:59 +00002527 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002528 assert id or row
showarda3c58572009-03-12 20:36:59 +00002529 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002530 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002531
jadmanski0afbb632008-06-06 21:10:57 +00002532 if self.host_id:
2533 self.host = Host(self.host_id)
2534 else:
2535 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002536
showard77182562009-06-10 00:16:05 +00002537 if self.atomic_group_id:
2538 self.atomic_group = AtomicGroup(self.atomic_group_id,
2539 always_query=False)
2540 else:
2541 self.atomic_group = None
2542
showard170873e2009-01-07 00:22:26 +00002543 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002544 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002545
2546
showard89f84db2009-03-12 20:39:13 +00002547 @classmethod
2548 def clone(cls, template):
2549 """
2550 Creates a new row using the values from a template instance.
2551
2552 The new instance will not exist in the database or have a valid
2553 id attribute until its save() method is called.
2554 """
2555 assert isinstance(template, cls)
2556 new_row = [getattr(template, field) for field in cls._fields]
2557 clone = cls(row=new_row, new_record=True)
2558 clone.id = None
2559 return clone
2560
2561
showardc85c21b2008-11-24 22:17:37 +00002562 def _view_job_url(self):
2563 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2564
2565
showardf1ae3542009-05-11 19:26:02 +00002566 def get_labels(self):
2567 """
2568 Get all labels associated with this host queue entry (either via the
2569 meta_host or as a job dependency label). The labels yielded are not
2570 guaranteed to be unique.
2571
2572 @yields Label instances associated with this host_queue_entry.
2573 """
2574 if self.meta_host:
2575 yield Label(id=self.meta_host, always_query=False)
2576 labels = Label.fetch(
2577 joins="JOIN jobs_dependency_labels AS deps "
2578 "ON (labels.id = deps.label_id)",
2579 where="deps.job_id = %d" % self.job.id)
2580 for label in labels:
2581 yield label
2582
2583
jadmanski0afbb632008-06-06 21:10:57 +00002584 def set_host(self, host):
2585 if host:
2586 self.queue_log_record('Assigning host ' + host.hostname)
2587 self.update_field('host_id', host.id)
2588 self.update_field('active', True)
2589 self.block_host(host.id)
2590 else:
2591 self.queue_log_record('Releasing host')
2592 self.unblock_host(self.host.id)
2593 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002594
jadmanski0afbb632008-06-06 21:10:57 +00002595 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002596
2597
jadmanski0afbb632008-06-06 21:10:57 +00002598 def get_host(self):
2599 return self.host
mbligh36768f02008-02-22 18:28:33 +00002600
2601
jadmanski0afbb632008-06-06 21:10:57 +00002602 def queue_log_record(self, log_line):
2603 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002604 _drone_manager.write_lines_to_file(self.queue_log_path,
2605 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002606
2607
jadmanski0afbb632008-06-06 21:10:57 +00002608 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002609 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002610 row = [0, self.job.id, host_id]
2611 block = IneligibleHostQueue(row=row, new_record=True)
2612 block.save()
mblighe2586682008-02-29 22:45:46 +00002613
2614
jadmanski0afbb632008-06-06 21:10:57 +00002615 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002616 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002617 blocks = IneligibleHostQueue.fetch(
2618 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2619 for block in blocks:
2620 block.delete()
mblighe2586682008-02-29 22:45:46 +00002621
2622
showard2bab8f42008-11-12 18:15:22 +00002623 def set_execution_subdir(self, subdir=None):
2624 if subdir is None:
2625 assert self.get_host()
2626 subdir = self.get_host().hostname
2627 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002628
2629
showard6355f6b2008-12-05 18:52:13 +00002630 def _get_hostname(self):
2631 if self.host:
2632 return self.host.hostname
2633 return 'no host'
2634
2635
showard170873e2009-01-07 00:22:26 +00002636 def __str__(self):
2637 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2638
2639
jadmanski0afbb632008-06-06 21:10:57 +00002640 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002641 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002642
showardb18134f2009-03-20 20:52:18 +00002643 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002644
showardc85c21b2008-11-24 22:17:37 +00002645 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002646 self.update_field('complete', False)
2647 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002648
jadmanski0afbb632008-06-06 21:10:57 +00002649 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002650 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002651 self.update_field('complete', False)
2652 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002653
showardc85c21b2008-11-24 22:17:37 +00002654 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002655 self.update_field('complete', True)
2656 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002657
2658 should_email_status = (status.lower() in _notify_email_statuses or
2659 'all' in _notify_email_statuses)
2660 if should_email_status:
2661 self._email_on_status(status)
2662
2663 self._email_on_job_complete()
2664
2665
2666 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002667 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002668
2669 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2670 self.job.id, self.job.name, hostname, status)
2671 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2672 self.job.id, self.job.name, hostname, status,
2673 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002674 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002675
2676
2677 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002678 if not self.job.is_finished():
2679 return
showard542e8402008-09-19 20:16:18 +00002680
showardc85c21b2008-11-24 22:17:37 +00002681 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002682 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002683 for queue_entry in hosts_queue:
2684 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002685 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002686 queue_entry.status))
2687
2688 summary_text = "\n".join(summary_text)
2689 status_counts = models.Job.objects.get_status_counts(
2690 [self.job.id])[self.job.id]
2691 status = ', '.join('%d %s' % (count, status) for status, count
2692 in status_counts.iteritems())
2693
2694 subject = 'Autotest: Job ID: %s "%s" %s' % (
2695 self.job.id, self.job.name, status)
2696 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2697 self.job.id, self.job.name, status, self._view_job_url(),
2698 summary_text)
showard170873e2009-01-07 00:22:26 +00002699 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002700
2701
showard77182562009-06-10 00:16:05 +00002702 def run_pre_job_tasks(self, assigned_host=None):
2703 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002704 assert assigned_host
2705 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002706 if self.host_id is None:
2707 self.set_host(assigned_host)
2708 else:
2709 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002710
showardb18134f2009-03-20 20:52:18 +00002711 logging.info("%s/%s/%s scheduled on %s, status=%s",
2712 self.job.name, self.meta_host, self.atomic_group_id,
2713 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002714
showard77182562009-06-10 00:16:05 +00002715 return self._do_run_pre_job_tasks()
2716
2717
2718 def _do_run_pre_job_tasks(self):
2719 # Every host goes thru the Verifying stage (which may or may not
2720 # actually do anything as determined by get_pre_job_tasks).
2721 self.set_status(models.HostQueueEntry.Status.VERIFYING)
2722
2723 # The pre job tasks always end with a SetEntryPendingTask which
2724 # will continue as appropriate through queue_entry.on_pending().
2725 return Agent(self.job.get_pre_job_tasks(queue_entry=self))
mblighe2586682008-02-29 22:45:46 +00002726
showard6ae5ea92009-02-25 00:11:51 +00002727
jadmanski0afbb632008-06-06 21:10:57 +00002728 def requeue(self):
2729 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002730 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002731 # verify/cleanup failure sets the execution subdir, so reset it here
2732 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002733 if self.meta_host:
2734 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002735
2736
jadmanski0afbb632008-06-06 21:10:57 +00002737 def handle_host_failure(self):
2738 """\
2739 Called when this queue entry's host has failed verification and
2740 repair.
2741 """
2742 assert not self.meta_host
2743 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002744 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002745
2746
jadmanskif7fa2cc2008-10-01 14:13:23 +00002747 @property
2748 def aborted_by(self):
2749 self._load_abort_info()
2750 return self._aborted_by
2751
2752
2753 @property
2754 def aborted_on(self):
2755 self._load_abort_info()
2756 return self._aborted_on
2757
2758
2759 def _load_abort_info(self):
2760 """ Fetch info about who aborted the job. """
2761 if hasattr(self, "_aborted_by"):
2762 return
2763 rows = _db.execute("""
2764 SELECT users.login, aborted_host_queue_entries.aborted_on
2765 FROM aborted_host_queue_entries
2766 INNER JOIN users
2767 ON users.id = aborted_host_queue_entries.aborted_by_id
2768 WHERE aborted_host_queue_entries.queue_entry_id = %s
2769 """, (self.id,))
2770 if rows:
2771 self._aborted_by, self._aborted_on = rows[0]
2772 else:
2773 self._aborted_by = self._aborted_on = None
2774
2775
showardb2e2c322008-10-14 17:33:55 +00002776 def on_pending(self):
2777 """
2778 Called when an entry in a synchronous job has passed verify. If the
2779 job is ready to run, returns an agent to run the job. Returns None
2780 otherwise.
2781 """
2782 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002783 self.get_host().set_status('Pending')
showard77182562009-06-10 00:16:05 +00002784 return self.job.run_if_ready(queue_entry=self)
showardb2e2c322008-10-14 17:33:55 +00002785
2786
showardd3dc1992009-04-22 21:01:40 +00002787 def abort(self, dispatcher):
2788 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002789
showardd3dc1992009-04-22 21:01:40 +00002790 Status = models.HostQueueEntry.Status
2791 has_running_job_agent = (
2792 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2793 and dispatcher.get_agents_for_entry(self))
2794 if has_running_job_agent:
2795 # do nothing; post-job tasks will finish and then mark this entry
2796 # with status "Aborted" and take care of the host
2797 return
2798
2799 if self.status in (Status.STARTING, Status.PENDING):
2800 self.host.set_status(models.Host.Status.READY)
2801 elif self.status == Status.VERIFYING:
2802 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2803
2804 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002805
2806 def execution_tag(self):
2807 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00002808 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002809
2810
mbligh36768f02008-02-22 18:28:33 +00002811class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002812 _table_name = 'jobs'
2813 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2814 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002815 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002816 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002817
showard77182562009-06-10 00:16:05 +00002818 # This does not need to be a column in the DB. The delays are likely to
2819 # be configured short. If the scheduler is stopped and restarted in
2820 # the middle of a job's delay cycle, the delay cycle will either be
2821 # repeated or skipped depending on the number of Pending machines found
2822 # when the restarted scheduler recovers to track it. Not a problem.
2823 #
2824 # A reference to the DelayedCallTask that will wake up the job should
2825 # no other HQEs change state in time. Its end_time attribute is used
2826 # by our run_with_ready_delay() method to determine if the wait is over.
2827 _delay_ready_task = None
2828
2829 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2830 # all status='Pending' atomic group HQEs incase a delay was running when the
2831 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002832
showarda3c58572009-03-12 20:36:59 +00002833 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002834 assert id or row
showarda3c58572009-03-12 20:36:59 +00002835 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002836
mblighe2586682008-02-29 22:45:46 +00002837
jadmanski0afbb632008-06-06 21:10:57 +00002838 def is_server_job(self):
2839 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002840
2841
showard170873e2009-01-07 00:22:26 +00002842 def tag(self):
2843 return "%s-%s" % (self.id, self.owner)
2844
2845
jadmanski0afbb632008-06-06 21:10:57 +00002846 def get_host_queue_entries(self):
2847 rows = _db.execute("""
2848 SELECT * FROM host_queue_entries
2849 WHERE job_id= %s
2850 """, (self.id,))
2851 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002852
jadmanski0afbb632008-06-06 21:10:57 +00002853 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002854
jadmanski0afbb632008-06-06 21:10:57 +00002855 return entries
mbligh36768f02008-02-22 18:28:33 +00002856
2857
jadmanski0afbb632008-06-06 21:10:57 +00002858 def set_status(self, status, update_queues=False):
2859 self.update_field('status',status)
2860
2861 if update_queues:
2862 for queue_entry in self.get_host_queue_entries():
2863 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002864
2865
showard77182562009-06-10 00:16:05 +00002866 def _atomic_and_has_started(self):
2867 """
2868 @returns True if any of the HostQueueEntries associated with this job
2869 have entered the Status.STARTING state or beyond.
2870 """
2871 atomic_entries = models.HostQueueEntry.objects.filter(
2872 job=self.id, atomic_group__isnull=False)
2873 if atomic_entries.count() <= 0:
2874 return False
2875
showardaf8b4ca2009-06-16 18:47:26 +00002876 # These states may *only* be reached if Job.run() has been called.
2877 started_statuses = (models.HostQueueEntry.Status.STARTING,
2878 models.HostQueueEntry.Status.RUNNING,
2879 models.HostQueueEntry.Status.COMPLETED)
2880
2881 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00002882 return started_entries.count() > 0
2883
2884
2885 def _pending_count(self):
2886 """The number of HostQueueEntries for this job in the Pending state."""
2887 pending_entries = models.HostQueueEntry.objects.filter(
2888 job=self.id, status=models.HostQueueEntry.Status.PENDING)
2889 return pending_entries.count()
2890
2891
jadmanski0afbb632008-06-06 21:10:57 +00002892 def is_ready(self):
showard77182562009-06-10 00:16:05 +00002893 # NOTE: Atomic group jobs stop reporting ready after they have been
2894 # started to avoid launching multiple copies of one atomic job.
2895 # Only possible if synch_count is less than than half the number of
2896 # machines in the atomic group.
2897 return (self._pending_count() >= self.synch_count
2898 and not self._atomic_and_has_started())
mbligh36768f02008-02-22 18:28:33 +00002899
2900
jadmanski0afbb632008-06-06 21:10:57 +00002901 def num_machines(self, clause = None):
2902 sql = "job_id=%s" % self.id
2903 if clause:
2904 sql += " AND (%s)" % clause
2905 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002906
2907
jadmanski0afbb632008-06-06 21:10:57 +00002908 def num_queued(self):
2909 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002910
2911
jadmanski0afbb632008-06-06 21:10:57 +00002912 def num_active(self):
2913 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002914
2915
jadmanski0afbb632008-06-06 21:10:57 +00002916 def num_complete(self):
2917 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002918
2919
jadmanski0afbb632008-06-06 21:10:57 +00002920 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002921 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002922
mbligh36768f02008-02-22 18:28:33 +00002923
showard6bb7c292009-01-30 01:44:51 +00002924 def _not_yet_run_entries(self, include_verifying=True):
2925 statuses = [models.HostQueueEntry.Status.QUEUED,
2926 models.HostQueueEntry.Status.PENDING]
2927 if include_verifying:
2928 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2929 return models.HostQueueEntry.objects.filter(job=self.id,
2930 status__in=statuses)
2931
2932
2933 def _stop_all_entries(self):
2934 entries_to_stop = self._not_yet_run_entries(
2935 include_verifying=False)
2936 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002937 assert not child_entry.complete, (
2938 '%s status=%s, active=%s, complete=%s' %
2939 (child_entry.id, child_entry.status, child_entry.active,
2940 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002941 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2942 child_entry.host.status = models.Host.Status.READY
2943 child_entry.host.save()
2944 child_entry.status = models.HostQueueEntry.Status.STOPPED
2945 child_entry.save()
2946
showard2bab8f42008-11-12 18:15:22 +00002947 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002948 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002949 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002950 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002951
2952
jadmanski0afbb632008-06-06 21:10:57 +00002953 def write_to_machines_file(self, queue_entry):
2954 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002955 file_path = os.path.join(self.tag(), '.machines')
2956 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002957
2958
showardf1ae3542009-05-11 19:26:02 +00002959 def _next_group_name(self, group_name=''):
2960 """@returns a directory name to use for the next host group results."""
2961 if group_name:
2962 # Sanitize for use as a pathname.
2963 group_name = group_name.replace(os.path.sep, '_')
2964 if group_name.startswith('.'):
2965 group_name = '_' + group_name[1:]
2966 # Add a separator between the group name and 'group%d'.
2967 group_name += '.'
2968 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00002969 query = models.HostQueueEntry.objects.filter(
2970 job=self.id).values('execution_subdir').distinct()
2971 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00002972 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
2973 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00002974 if ids:
2975 next_id = max(ids) + 1
2976 else:
2977 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00002978 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00002979
2980
showard170873e2009-01-07 00:22:26 +00002981 def _write_control_file(self, execution_tag):
2982 control_path = _drone_manager.attach_file_to_execution(
2983 execution_tag, self.control_file)
2984 return control_path
mbligh36768f02008-02-22 18:28:33 +00002985
showardb2e2c322008-10-14 17:33:55 +00002986
showard2bab8f42008-11-12 18:15:22 +00002987 def get_group_entries(self, queue_entry_from_group):
2988 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002989 return list(HostQueueEntry.fetch(
2990 where='job_id=%s AND execution_subdir=%s',
2991 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002992
2993
showardb2e2c322008-10-14 17:33:55 +00002994 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002995 assert queue_entries
2996 execution_tag = queue_entries[0].execution_tag()
2997 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002998 hostnames = ','.join([entry.get_host().hostname
2999 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00003000
showard87ba02a2009-04-20 19:37:32 +00003001 params = _autoserv_command_line(
3002 hostnames, execution_tag,
3003 ['-P', execution_tag, '-n',
3004 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003005 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003006
jadmanski0afbb632008-06-06 21:10:57 +00003007 if not self.is_server_job():
3008 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003009
showardb2e2c322008-10-14 17:33:55 +00003010 return params
mblighe2586682008-02-29 22:45:46 +00003011
mbligh36768f02008-02-22 18:28:33 +00003012
showardc9ae1782009-01-30 01:42:37 +00003013 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003014 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003015 return True
showard0fc38302008-10-23 00:44:07 +00003016 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00003017 return queue_entry.get_host().dirty
3018 return False
showard21baa452008-10-21 00:08:39 +00003019
showardc9ae1782009-01-30 01:42:37 +00003020
showard2fe3f1d2009-07-06 20:19:11 +00003021 def should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003022 do_not_verify = (queue_entry.host.protection ==
3023 host_protections.Protection.DO_NOT_VERIFY)
3024 if do_not_verify:
3025 return False
3026 return self.run_verify
3027
3028
showard77182562009-06-10 00:16:05 +00003029 def get_pre_job_tasks(self, queue_entry):
3030 """
3031 Get a list of tasks to perform before the host_queue_entry
3032 may be used to run this Job (such as Cleanup & Verify).
3033
3034 @returns A list of tasks to be done to the given queue_entry before
3035 it should be considered be ready to run this job. The last
3036 task in the list calls HostQueueEntry.on_pending(), which
3037 continues the flow of the job.
3038 """
showard21baa452008-10-21 00:08:39 +00003039 tasks = []
showardc9ae1782009-01-30 01:42:37 +00003040 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00003041 tasks.append(CleanupTask(queue_entry=queue_entry))
showard2fe3f1d2009-07-06 20:19:11 +00003042 if self.should_run_verify(queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003043 tasks.append(VerifyTask(queue_entry=queue_entry))
3044 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00003045 return tasks
3046
3047
showardf1ae3542009-05-11 19:26:02 +00003048 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003049 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00003050 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00003051 else:
showardf1ae3542009-05-11 19:26:02 +00003052 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003053 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003054 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003055 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003056
3057 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003058 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003059
3060
3061 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003062 """
3063 @returns A tuple containing a list of HostQueueEntry instances to be
3064 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003065 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003066 """
showard77182562009-06-10 00:16:05 +00003067 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003068 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003069 if atomic_group:
3070 num_entries_wanted = atomic_group.max_number_of_machines
3071 else:
3072 num_entries_wanted = self.synch_count
3073 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003074
showardf1ae3542009-05-11 19:26:02 +00003075 if num_entries_wanted > 0:
3076 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003077 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003078 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003079 params=(self.id, include_queue_entry.id)))
3080
3081 # Sort the chosen hosts by hostname before slicing.
3082 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3083 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3084 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3085 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003086
showardf1ae3542009-05-11 19:26:02 +00003087 # Sanity check. We'll only ever be called if this can be met.
3088 assert len(chosen_entries) >= self.synch_count
3089
3090 if atomic_group:
3091 # Look at any meta_host and dependency labels and pick the first
3092 # one that also specifies this atomic group. Use that label name
3093 # as the group name if possible (it is more specific).
3094 group_name = atomic_group.name
3095 for label in include_queue_entry.get_labels():
3096 if label.atomic_group_id:
3097 assert label.atomic_group_id == atomic_group.id
3098 group_name = label.name
3099 break
3100 else:
3101 group_name = ''
3102
3103 self._assign_new_group(chosen_entries, group_name=group_name)
3104 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00003105
3106
showard77182562009-06-10 00:16:05 +00003107 def run_if_ready(self, queue_entry):
3108 """
3109 @returns An Agent instance to ultimately run this job if enough hosts
3110 are ready for it to run.
3111 @returns None and potentially cleans up excess hosts if this Job
3112 is not ready to run.
3113 """
showardb2e2c322008-10-14 17:33:55 +00003114 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003115 self.stop_if_necessary()
3116 return None
mbligh36768f02008-02-22 18:28:33 +00003117
showard77182562009-06-10 00:16:05 +00003118 if queue_entry.atomic_group:
3119 return self.run_with_ready_delay(queue_entry)
3120
3121 return self.run(queue_entry)
3122
3123
3124 def run_with_ready_delay(self, queue_entry):
3125 """
3126 Start a delay to wait for more hosts to enter Pending state before
3127 launching an atomic group job. Once set, the a delay cannot be reset.
3128
3129 @param queue_entry: The HostQueueEntry object to get atomic group
3130 info from and pass to run_if_ready when the delay is up.
3131
3132 @returns An Agent to run the job as appropriate or None if a delay
3133 has already been set.
3134 """
3135 assert queue_entry.job_id == self.id
3136 assert queue_entry.atomic_group
3137 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3138 pending_threshold = queue_entry.atomic_group.max_number_of_machines
3139 over_max_threshold = (self._pending_count() >= pending_threshold)
3140 delay_expired = (self._delay_ready_task and
3141 time.time() >= self._delay_ready_task.end_time)
3142
3143 # Delay is disabled or we already have enough? Do not wait to run.
3144 if not delay or over_max_threshold or delay_expired:
3145 return self.run(queue_entry)
3146
3147 # A delay was previously scheduled.
3148 if self._delay_ready_task:
3149 return None
3150
3151 def run_job_after_delay():
3152 logging.info('Job %s done waiting for extra hosts.', self.id)
3153 return self.run(queue_entry)
3154
3155 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3156 callback=run_job_after_delay)
3157
3158 return Agent([self._delay_ready_task], num_processes=0)
3159
3160
3161 def run(self, queue_entry):
3162 """
3163 @param queue_entry: The HostQueueEntry instance calling this method.
3164 @returns An Agent instance to run this job or None if we've already
3165 been run.
3166 """
3167 if queue_entry.atomic_group and self._atomic_and_has_started():
3168 logging.error('Job.run() called on running atomic Job %d '
3169 'with HQE %s.', self.id, queue_entry)
3170 return None
showardf1ae3542009-05-11 19:26:02 +00003171 queue_entries, group_name = self._choose_group_to_run(queue_entry)
3172 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00003173
3174
showardf1ae3542009-05-11 19:26:02 +00003175 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00003176 for queue_entry in queue_entries:
3177 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00003178 params = self._get_autoserv_params(queue_entries)
3179 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00003180 cmd=params, group_name=group_name)
3181 tasks = [queue_task]
showard77182562009-06-10 00:16:05 +00003182 if self._delay_ready_task:
3183 # Cancel any pending callback that would try to run again
3184 # as we are already running.
3185 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003186
showard170873e2009-01-07 00:22:26 +00003187 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00003188
3189
mbligh36768f02008-02-22 18:28:33 +00003190if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003191 main()