blob: 072a957d929886aa6f4795c43c9efbb89f064628 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
mblighb090f142008-02-27 21:33:46 +000023
mbligh36768f02008-02-22 18:28:33 +000024RESULTS_DIR = '.'
25AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000026DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000027AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
63
mbligh83c1e9e2009-05-01 23:10:41 +000064def _site_init_monitor_db_dummy():
65 return {}
66
67
mbligh36768f02008-02-22 18:28:33 +000068def main():
showard27f33872009-04-07 18:20:53 +000069 try:
70 main_without_exception_handling()
showard29caa4b2009-05-26 19:27:09 +000071 except SystemExit:
72 raise
showard27f33872009-04-07 18:20:53 +000073 except:
74 logging.exception('Exception escaping in monitor_db')
75 raise
76
77
78def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000079 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000080
showard136e6dc2009-06-10 19:38:49 +000081 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000082 parser = optparse.OptionParser(usage)
83 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
84 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000085 parser.add_option('--test', help='Indicate that scheduler is under ' +
86 'test and should use dummy autoserv and no parsing',
87 action='store_true')
88 (options, args) = parser.parse_args()
89 if len(args) != 1:
90 parser.print_usage()
91 return
mbligh36768f02008-02-22 18:28:33 +000092
showard5613c662009-06-08 23:30:33 +000093 scheduler_enabled = global_config.global_config.get_config_value(
94 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
95
96 if not scheduler_enabled:
97 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
98 "global_config's SCHEDULER section to enabled it. Exiting.")
99 print msg
100 sys.exit(1)
101
jadmanski0afbb632008-06-06 21:10:57 +0000102 global RESULTS_DIR
103 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000104
mbligh83c1e9e2009-05-01 23:10:41 +0000105 site_init = utils.import_site_function(__file__,
106 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
107 _site_init_monitor_db_dummy)
108 site_init()
109
showardcca334f2009-03-12 20:38:34 +0000110 # Change the cwd while running to avoid issues incase we were launched from
111 # somewhere odd (such as a random NFS home directory of the person running
112 # sudo to launch us as the appropriate user).
113 os.chdir(RESULTS_DIR)
114
jadmanski0afbb632008-06-06 21:10:57 +0000115 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000116 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
117 "notify_email_statuses",
118 default='')
showardc85c21b2008-11-24 22:17:37 +0000119 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000120 _notify_email_statuses = [status for status in
121 re.split(r'[\s,;:]', notify_statuses_list.lower())
122 if status]
showardc85c21b2008-11-24 22:17:37 +0000123
jadmanski0afbb632008-06-06 21:10:57 +0000124 if options.test:
125 global _autoserv_path
126 _autoserv_path = 'autoserv_dummy'
127 global _testing_mode
128 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000129
mbligh37eceaa2008-12-15 22:56:37 +0000130 # AUTOTEST_WEB.base_url is still a supported config option as some people
131 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000132 global _base_url
showard170873e2009-01-07 00:22:26 +0000133 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
134 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000135 if config_base_url:
136 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000137 else:
mbligh37eceaa2008-12-15 22:56:37 +0000138 # For the common case of everything running on a single server you
139 # can just set the hostname in a single place in the config file.
140 server_name = c.get_config_value('SERVER', 'hostname')
141 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000142 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000143 sys.exit(1)
144 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000145
showardc5afc462009-01-13 00:09:39 +0000146 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000147 server.start()
148
jadmanski0afbb632008-06-06 21:10:57 +0000149 try:
showard136e6dc2009-06-10 19:38:49 +0000150 init()
showardc5afc462009-01-13 00:09:39 +0000151 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000152 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000153
jadmanski0afbb632008-06-06 21:10:57 +0000154 while not _shutdown:
155 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000156 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000157 except:
showard170873e2009-01-07 00:22:26 +0000158 email_manager.manager.log_stacktrace(
159 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000160
showard170873e2009-01-07 00:22:26 +0000161 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000162 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000163 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000164 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000165
166
showard136e6dc2009-06-10 19:38:49 +0000167def setup_logging():
168 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
169 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
170 logging_manager.configure_logging(
171 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
172 logfile_name=log_name)
173
174
mbligh36768f02008-02-22 18:28:33 +0000175def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000176 global _shutdown
177 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000178 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000179
180
showard136e6dc2009-06-10 19:38:49 +0000181def init():
showardb18134f2009-03-20 20:52:18 +0000182 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
183 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000184
mblighfb676032009-04-01 18:25:38 +0000185 utils.write_pid("monitor_db")
186
showardb1e51872008-10-07 11:08:18 +0000187 if _testing_mode:
188 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000189 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000190
jadmanski0afbb632008-06-06 21:10:57 +0000191 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
192 global _db
showard170873e2009-01-07 00:22:26 +0000193 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000194 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000195
showardfa8629c2008-11-04 16:51:23 +0000196 # ensure Django connection is in autocommit
197 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000198 # bypass the readonly connection
199 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000200
showardb18134f2009-03-20 20:52:18 +0000201 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000202 signal.signal(signal.SIGINT, handle_sigint)
203
showardd1ee1dd2009-01-07 21:33:08 +0000204 drones = global_config.global_config.get_config_value(
205 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
206 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000207 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000208 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000209 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
210
showardb18134f2009-03-20 20:52:18 +0000211 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000212
213
showarded2afea2009-07-07 20:54:07 +0000214def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
215 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000216 """
217 @returns The autoserv command line as a list of executable + parameters.
218
219 @param machines - string - A machine or comma separated list of machines
220 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000221 @param extra_args - list - Additional arguments to pass to autoserv.
222 @param job - Job object - If supplied, -u owner and -l name parameters
223 will be added.
224 @param queue_entry - A HostQueueEntry object - If supplied and no Job
225 object was supplied, this will be used to lookup the Job object.
226 """
showard87ba02a2009-04-20 19:37:32 +0000227 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
showarded2afea2009-07-07 20:54:07 +0000228 '-r', drone_manager.WORKING_DIRECTORY]
showard87ba02a2009-04-20 19:37:32 +0000229 if job or queue_entry:
230 if not job:
231 job = queue_entry.job
232 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000233 if verbose:
234 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000235 return autoserv_argv + extra_args
236
237
showard89f84db2009-03-12 20:39:13 +0000238class SchedulerError(Exception):
239 """Raised by HostScheduler when an inconsistent state occurs."""
240
241
showard63a34772008-08-18 19:32:50 +0000242class HostScheduler(object):
243 def _get_ready_hosts(self):
244 # avoid any host with a currently active queue entry against it
245 hosts = Host.fetch(
246 joins='LEFT JOIN host_queue_entries AS active_hqe '
247 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000248 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000249 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000250 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000251 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
252 return dict((host.id, host) for host in hosts)
253
254
255 @staticmethod
256 def _get_sql_id_list(id_list):
257 return ','.join(str(item_id) for item_id in id_list)
258
259
260 @classmethod
showard989f25d2008-10-01 11:38:11 +0000261 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000262 if not id_list:
263 return {}
showard63a34772008-08-18 19:32:50 +0000264 query %= cls._get_sql_id_list(id_list)
265 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000266 return cls._process_many2many_dict(rows, flip)
267
268
269 @staticmethod
270 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000271 result = {}
272 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000273 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000274 if flip:
275 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000276 result.setdefault(left_id, set()).add(right_id)
277 return result
278
279
280 @classmethod
281 def _get_job_acl_groups(cls, job_ids):
282 query = """
showardd9ac4452009-02-07 02:04:37 +0000283 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000284 FROM jobs
285 INNER JOIN users ON users.login = jobs.owner
286 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
287 WHERE jobs.id IN (%s)
288 """
289 return cls._get_many2many_dict(query, job_ids)
290
291
292 @classmethod
293 def _get_job_ineligible_hosts(cls, job_ids):
294 query = """
295 SELECT job_id, host_id
296 FROM ineligible_host_queues
297 WHERE job_id IN (%s)
298 """
299 return cls._get_many2many_dict(query, job_ids)
300
301
302 @classmethod
showard989f25d2008-10-01 11:38:11 +0000303 def _get_job_dependencies(cls, job_ids):
304 query = """
305 SELECT job_id, label_id
306 FROM jobs_dependency_labels
307 WHERE job_id IN (%s)
308 """
309 return cls._get_many2many_dict(query, job_ids)
310
311
312 @classmethod
showard63a34772008-08-18 19:32:50 +0000313 def _get_host_acls(cls, host_ids):
314 query = """
showardd9ac4452009-02-07 02:04:37 +0000315 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000316 FROM acl_groups_hosts
317 WHERE host_id IN (%s)
318 """
319 return cls._get_many2many_dict(query, host_ids)
320
321
322 @classmethod
323 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000324 if not host_ids:
325 return {}, {}
showard63a34772008-08-18 19:32:50 +0000326 query = """
327 SELECT label_id, host_id
328 FROM hosts_labels
329 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000330 """ % cls._get_sql_id_list(host_ids)
331 rows = _db.execute(query)
332 labels_to_hosts = cls._process_many2many_dict(rows)
333 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
334 return labels_to_hosts, hosts_to_labels
335
336
337 @classmethod
338 def _get_labels(cls):
339 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000340
341
342 def refresh(self, pending_queue_entries):
343 self._hosts_available = self._get_ready_hosts()
344
345 relevant_jobs = [queue_entry.job_id
346 for queue_entry in pending_queue_entries]
347 self._job_acls = self._get_job_acl_groups(relevant_jobs)
348 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000349 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000350
351 host_ids = self._hosts_available.keys()
352 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000353 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
354
355 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000356
357
358 def _is_acl_accessible(self, host_id, queue_entry):
359 job_acls = self._job_acls.get(queue_entry.job_id, set())
360 host_acls = self._host_acls.get(host_id, set())
361 return len(host_acls.intersection(job_acls)) > 0
362
363
showard989f25d2008-10-01 11:38:11 +0000364 def _check_job_dependencies(self, job_dependencies, host_labels):
365 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000366 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000367
368
369 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
370 queue_entry):
showardade14e22009-01-26 22:38:32 +0000371 if not queue_entry.meta_host:
372 # bypass only_if_needed labels when a specific host is selected
373 return True
374
showard989f25d2008-10-01 11:38:11 +0000375 for label_id in host_labels:
376 label = self._labels[label_id]
377 if not label.only_if_needed:
378 # we don't care about non-only_if_needed labels
379 continue
380 if queue_entry.meta_host == label_id:
381 # if the label was requested in a metahost it's OK
382 continue
383 if label_id not in job_dependencies:
384 return False
385 return True
386
387
showard89f84db2009-03-12 20:39:13 +0000388 def _check_atomic_group_labels(self, host_labels, queue_entry):
389 """
390 Determine if the given HostQueueEntry's atomic group settings are okay
391 to schedule on a host with the given labels.
392
showard6157c632009-07-06 20:19:31 +0000393 @param host_labels: A list of label ids that the host has.
394 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000395
396 @returns True if atomic group settings are okay, False otherwise.
397 """
showard6157c632009-07-06 20:19:31 +0000398 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000399 queue_entry.atomic_group_id)
400
401
showard6157c632009-07-06 20:19:31 +0000402 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000403 """
404 Return the atomic group label id for a host with the given set of
405 labels if any, or None otherwise. Raises an exception if more than
406 one atomic group are found in the set of labels.
407
showard6157c632009-07-06 20:19:31 +0000408 @param host_labels: A list of label ids that the host has.
409 @param queue_entry: The HostQueueEntry we're testing. Only used for
410 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000411
412 @returns The id of the atomic group found on a label in host_labels
413 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000414 """
showard6157c632009-07-06 20:19:31 +0000415 atomic_labels = [self._labels[label_id] for label_id in host_labels
416 if self._labels[label_id].atomic_group_id is not None]
417 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000418 if not atomic_ids:
419 return None
420 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000421 logging.error('More than one Atomic Group on HQE "%s" via: %r',
422 queue_entry, atomic_labels)
423 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000424
425
426 def _get_atomic_group_labels(self, atomic_group_id):
427 """
428 Lookup the label ids that an atomic_group is associated with.
429
430 @param atomic_group_id - The id of the AtomicGroup to look up.
431
432 @returns A generator yeilding Label ids for this atomic group.
433 """
434 return (id for id, label in self._labels.iteritems()
435 if label.atomic_group_id == atomic_group_id
436 and not label.invalid)
437
438
showard54c1ea92009-05-20 00:32:58 +0000439 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000440 """
441 @param group_hosts - A sequence of Host ids to test for usability
442 and eligibility against the Job associated with queue_entry.
443 @param queue_entry - The HostQueueEntry that these hosts are being
444 tested for eligibility against.
445
446 @returns A subset of group_hosts Host ids that are eligible for the
447 supplied queue_entry.
448 """
449 return set(host_id for host_id in group_hosts
450 if self._is_host_usable(host_id)
451 and self._is_host_eligible_for_job(host_id, queue_entry))
452
453
showard989f25d2008-10-01 11:38:11 +0000454 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000455 if self._is_host_invalid(host_id):
456 # if an invalid host is scheduled for a job, it's a one-time host
457 # and it therefore bypasses eligibility checks. note this can only
458 # happen for non-metahosts, because invalid hosts have their label
459 # relationships cleared.
460 return True
461
showard989f25d2008-10-01 11:38:11 +0000462 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
463 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000464
showard89f84db2009-03-12 20:39:13 +0000465 return (self._is_acl_accessible(host_id, queue_entry) and
466 self._check_job_dependencies(job_dependencies, host_labels) and
467 self._check_only_if_needed_labels(
468 job_dependencies, host_labels, queue_entry) and
469 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000470
471
showard2924b0a2009-06-18 23:16:15 +0000472 def _is_host_invalid(self, host_id):
473 host_object = self._hosts_available.get(host_id, None)
474 return host_object and host_object.invalid
475
476
showard63a34772008-08-18 19:32:50 +0000477 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000478 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000479 return None
480 return self._hosts_available.pop(queue_entry.host_id, None)
481
482
483 def _is_host_usable(self, host_id):
484 if host_id not in self._hosts_available:
485 # host was already used during this scheduling cycle
486 return False
487 if self._hosts_available[host_id].invalid:
488 # Invalid hosts cannot be used for metahosts. They're included in
489 # the original query because they can be used by non-metahosts.
490 return False
491 return True
492
493
494 def _schedule_metahost(self, queue_entry):
495 label_id = queue_entry.meta_host
496 hosts_in_label = self._label_hosts.get(label_id, set())
497 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
498 set())
499
500 # must iterate over a copy so we can mutate the original while iterating
501 for host_id in list(hosts_in_label):
502 if not self._is_host_usable(host_id):
503 hosts_in_label.remove(host_id)
504 continue
505 if host_id in ineligible_host_ids:
506 continue
showard989f25d2008-10-01 11:38:11 +0000507 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000508 continue
509
showard89f84db2009-03-12 20:39:13 +0000510 # Remove the host from our cached internal state before returning
511 # the host object.
showard63a34772008-08-18 19:32:50 +0000512 hosts_in_label.remove(host_id)
513 return self._hosts_available.pop(host_id)
514 return None
515
516
517 def find_eligible_host(self, queue_entry):
518 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000519 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000520 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000521 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000522 return self._schedule_metahost(queue_entry)
523
524
showard89f84db2009-03-12 20:39:13 +0000525 def find_eligible_atomic_group(self, queue_entry):
526 """
527 Given an atomic group host queue entry, locate an appropriate group
528 of hosts for the associated job to run on.
529
530 The caller is responsible for creating new HQEs for the additional
531 hosts returned in order to run the actual job on them.
532
533 @returns A list of Host instances in a ready state to satisfy this
534 atomic group scheduling. Hosts will all belong to the same
535 atomic group label as specified by the queue_entry.
536 An empty list will be returned if no suitable atomic
537 group could be found.
538
539 TODO(gps): what is responsible for kicking off any attempted repairs on
540 a group of hosts? not this function, but something needs to. We do
541 not communicate that reason for returning [] outside of here...
542 For now, we'll just be unschedulable if enough hosts within one group
543 enter Repair Failed state.
544 """
545 assert queue_entry.atomic_group_id is not None
546 job = queue_entry.job
547 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000548 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000549 if job.synch_count > atomic_group.max_number_of_machines:
550 # Such a Job and HostQueueEntry should never be possible to
551 # create using the frontend. Regardless, we can't process it.
552 # Abort it immediately and log an error on the scheduler.
553 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000554 logging.error(
555 'Error: job %d synch_count=%d > requested atomic_group %d '
556 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
557 job.id, job.synch_count, atomic_group.id,
558 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000559 return []
560 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
561 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
562 set())
563
564 # Look in each label associated with atomic_group until we find one with
565 # enough hosts to satisfy the job.
566 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
567 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
568 if queue_entry.meta_host is not None:
569 # If we have a metahost label, only allow its hosts.
570 group_hosts.intersection_update(hosts_in_label)
571 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000572 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000573 group_hosts, queue_entry)
574
575 # Job.synch_count is treated as "minimum synch count" when
576 # scheduling for an atomic group of hosts. The atomic group
577 # number of machines is the maximum to pick out of a single
578 # atomic group label for scheduling at one time.
579 min_hosts = job.synch_count
580 max_hosts = atomic_group.max_number_of_machines
581
showard54c1ea92009-05-20 00:32:58 +0000582 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000583 # Not enough eligible hosts in this atomic group label.
584 continue
585
showard54c1ea92009-05-20 00:32:58 +0000586 eligible_hosts_in_group = [self._hosts_available[id]
587 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000588 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000589 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000590
showard89f84db2009-03-12 20:39:13 +0000591 # Limit ourselves to scheduling the atomic group size.
592 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000593 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000594
595 # Remove the selected hosts from our cached internal state
596 # of available hosts in order to return the Host objects.
597 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000598 for host in eligible_hosts_in_group:
599 hosts_in_label.discard(host.id)
600 self._hosts_available.pop(host.id)
601 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000602 return host_list
603
604 return []
605
606
showard170873e2009-01-07 00:22:26 +0000607class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000608 def __init__(self):
609 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000610 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000611 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000612 user_cleanup_time = scheduler_config.config.clean_interval
613 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
614 _db, user_cleanup_time)
615 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000616 self._host_agents = {}
617 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000618
mbligh36768f02008-02-22 18:28:33 +0000619
showard915958d2009-04-22 21:00:58 +0000620 def initialize(self, recover_hosts=True):
621 self._periodic_cleanup.initialize()
622 self._24hr_upkeep.initialize()
623
jadmanski0afbb632008-06-06 21:10:57 +0000624 # always recover processes
625 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000626
jadmanski0afbb632008-06-06 21:10:57 +0000627 if recover_hosts:
628 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000629
630
jadmanski0afbb632008-06-06 21:10:57 +0000631 def tick(self):
showard170873e2009-01-07 00:22:26 +0000632 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000633 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000634 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000635 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000636 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000637 self._schedule_new_jobs()
638 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000639 _drone_manager.execute_actions()
640 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000641
showard97aed502008-11-04 02:01:24 +0000642
mblighf3294cc2009-04-08 21:17:38 +0000643 def _run_cleanup(self):
644 self._periodic_cleanup.run_cleanup_maybe()
645 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000646
mbligh36768f02008-02-22 18:28:33 +0000647
showard170873e2009-01-07 00:22:26 +0000648 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
649 for object_id in object_ids:
650 agent_dict.setdefault(object_id, set()).add(agent)
651
652
653 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
654 for object_id in object_ids:
655 assert object_id in agent_dict
656 agent_dict[object_id].remove(agent)
657
658
jadmanski0afbb632008-06-06 21:10:57 +0000659 def add_agent(self, agent):
660 self._agents.append(agent)
661 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000662 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
663 self._register_agent_for_ids(self._queue_entry_agents,
664 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000665
showard170873e2009-01-07 00:22:26 +0000666
667 def get_agents_for_entry(self, queue_entry):
668 """
669 Find agents corresponding to the specified queue_entry.
670 """
showardd3dc1992009-04-22 21:01:40 +0000671 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000672
673
674 def host_has_agent(self, host):
675 """
676 Determine if there is currently an Agent present using this host.
677 """
678 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000679
680
jadmanski0afbb632008-06-06 21:10:57 +0000681 def remove_agent(self, agent):
682 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000683 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
684 agent)
685 self._unregister_agent_for_ids(self._queue_entry_agents,
686 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000687
688
jadmanski0afbb632008-06-06 21:10:57 +0000689 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000690 self._register_pidfiles()
691 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000692 self._recover_all_recoverable_entries()
showard6878e8b2009-07-20 22:37:45 +0000693 self._check_for_remaining_active_entries()
showard170873e2009-01-07 00:22:26 +0000694 self._reverify_remaining_hosts()
695 # reinitialize drones after killing orphaned processes, since they can
696 # leave around files when they die
697 _drone_manager.execute_actions()
698 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000699
showard170873e2009-01-07 00:22:26 +0000700
701 def _register_pidfiles(self):
702 # during recovery we may need to read pidfiles for both running and
703 # parsing entries
704 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000705 where="status IN ('Running', 'Gathering', 'Parsing')")
showarded2afea2009-07-07 20:54:07 +0000706 special_tasks = models.SpecialTask.objects.filter(is_active=True)
707 for execution_entry in itertools.chain(queue_entries, special_tasks):
showardd3dc1992009-04-22 21:01:40 +0000708 for pidfile_name in _ALL_PIDFILE_NAMES:
709 pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000710 execution_entry.execution_path(), pidfile_name=pidfile_name)
showardd3dc1992009-04-22 21:01:40 +0000711 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000712
713
showarded2afea2009-07-07 20:54:07 +0000714 def _get_recovery_run_monitor(self, execution_path, pidfile_name, orphans):
715 run_monitor = PidfileRunMonitor()
716 run_monitor.attach_to_existing_process(execution_path,
717 pidfile_name=pidfile_name)
718 if run_monitor.has_process():
719 orphans.discard(run_monitor.get_process())
720 return run_monitor, '(process %s)' % run_monitor.get_process()
721 return None, 'without process'
722
723
showardd3dc1992009-04-22 21:01:40 +0000724 def _recover_entries_with_status(self, status, orphans, pidfile_name,
725 recover_entries_fn):
726 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000727 for queue_entry in queue_entries:
728 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000729 # synchronous job we've already recovered
730 continue
showardd3dc1992009-04-22 21:01:40 +0000731 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showarded2afea2009-07-07 20:54:07 +0000732 run_monitor, process_string = self._get_recovery_run_monitor(
733 queue_entry.execution_path(), pidfile_name, orphans)
showard597bfd32009-05-08 18:22:50 +0000734
showarded2afea2009-07-07 20:54:07 +0000735 logging.info('Recovering %s entry %s %s',status.lower(),
736 ', '.join(str(entry) for entry in queue_entries),
737 process_string)
showardd3dc1992009-04-22 21:01:40 +0000738 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000739
740
showard6878e8b2009-07-20 22:37:45 +0000741 def _check_for_remaining_orphan_processes(self, orphans):
742 if not orphans:
743 return
744 subject = 'Unrecovered orphan autoserv processes remain'
745 message = '\n'.join(str(process) for process in orphans)
746 email_manager.manager.enqueue_notify_email(subject, message)
747 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000748
showard170873e2009-01-07 00:22:26 +0000749
showardd3dc1992009-04-22 21:01:40 +0000750 def _recover_running_entries(self, orphans):
751 def recover_entries(job, queue_entries, run_monitor):
752 if run_monitor is not None:
showarded2afea2009-07-07 20:54:07 +0000753 queue_task = QueueTask(job=job, queue_entries=queue_entries,
754 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000755 self.add_agent(Agent(tasks=[queue_task],
756 num_processes=len(queue_entries)))
showard6878e8b2009-07-20 22:37:45 +0000757 else:
758 # we could do better, but this retains legacy behavior for now
759 for queue_entry in queue_entries:
760 logging.info('Requeuing running HQE %s since it has no '
761 'process' % queue_entry)
762 queue_entry.requeue()
showardd3dc1992009-04-22 21:01:40 +0000763
764 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
showarded2afea2009-07-07 20:54:07 +0000765 orphans, _AUTOSERV_PID_FILE,
showardd3dc1992009-04-22 21:01:40 +0000766 recover_entries)
767
768
769 def _recover_gathering_entries(self, orphans):
770 def recover_entries(job, queue_entries, run_monitor):
771 gather_task = GatherLogsTask(job, queue_entries,
showarded2afea2009-07-07 20:54:07 +0000772 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000773 self.add_agent(Agent([gather_task]))
774
775 self._recover_entries_with_status(
776 models.HostQueueEntry.Status.GATHERING,
777 orphans, _CRASHINFO_PID_FILE, recover_entries)
778
779
780 def _recover_parsing_entries(self, orphans):
781 def recover_entries(job, queue_entries, run_monitor):
782 reparse_task = FinalReparseTask(queue_entries,
showarded2afea2009-07-07 20:54:07 +0000783 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000784 self.add_agent(Agent([reparse_task], num_processes=0))
785
786 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
787 orphans, _PARSER_PID_FILE,
788 recover_entries)
789
790
791 def _recover_all_recoverable_entries(self):
792 orphans = _drone_manager.get_orphaned_autoserv_processes()
793 self._recover_running_entries(orphans)
794 self._recover_gathering_entries(orphans)
795 self._recover_parsing_entries(orphans)
showarded2afea2009-07-07 20:54:07 +0000796 self._recover_special_tasks(orphans)
showard6878e8b2009-07-20 22:37:45 +0000797 self._check_for_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000798
showard97aed502008-11-04 02:01:24 +0000799
showarded2afea2009-07-07 20:54:07 +0000800 def _recover_special_tasks(self, orphans):
showard2fe3f1d2009-07-06 20:19:11 +0000801 """\
802 Recovers all special tasks that have started running but have not
803 completed.
804 """
805
806 tasks = models.SpecialTask.objects.filter(is_active=True,
807 is_complete=False)
808 # Use ordering to force NULL queue_entry_id's to the end of the list
809 for task in tasks.order_by('-queue_entry_id'):
showarded2afea2009-07-07 20:54:07 +0000810 assert not self.host_has_agent(task.host)
showard2fe3f1d2009-07-06 20:19:11 +0000811
812 host = Host(id=task.host.id)
813 queue_entry = None
814 if task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000815 queue_entry = HostQueueEntry(id=task.queue_entry.id)
showard2fe3f1d2009-07-06 20:19:11 +0000816
showarded2afea2009-07-07 20:54:07 +0000817 run_monitor, process_string = self._get_recovery_run_monitor(
818 task.execution_path(), _AUTOSERV_PID_FILE, orphans)
819
820 logging.info('Recovering %s %s', task, process_string)
821 self._recover_special_task(task, host, queue_entry, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000822
823
showarded2afea2009-07-07 20:54:07 +0000824 def _recover_special_task(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000825 """\
826 Recovers a single special task.
827 """
828 if task.task == models.SpecialTask.Task.VERIFY:
showarded2afea2009-07-07 20:54:07 +0000829 agent_tasks = self._recover_verify(task, host, queue_entry,
830 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000831 elif task.task == models.SpecialTask.Task.REPAIR:
showarded2afea2009-07-07 20:54:07 +0000832 agent_tasks = self._recover_repair(task, host, queue_entry,
833 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000834 elif task.task == models.SpecialTask.Task.CLEANUP:
showarded2afea2009-07-07 20:54:07 +0000835 agent_tasks = self._recover_cleanup(task, host, queue_entry,
836 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000837 else:
838 # Should never happen
839 logging.error(
840 "Special task id %d had invalid task %s", (task.id, task.task))
841
842 self.add_agent(Agent(agent_tasks))
843
844
showarded2afea2009-07-07 20:54:07 +0000845 def _recover_verify(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000846 """\
847 Recovers a verify task.
848 No associated queue entry: Verify host
849 With associated queue entry: Verify host, and run associated queue
850 entry
851 """
852 if not task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000853 return [VerifyTask(host=host, task=task,
854 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000855 else:
showarded2afea2009-07-07 20:54:07 +0000856 return [VerifyTask(queue_entry=queue_entry, task=task,
857 recover_run_monitor=run_monitor),
showard2fe3f1d2009-07-06 20:19:11 +0000858 SetEntryPendingTask(queue_entry=queue_entry)]
859
860
showarded2afea2009-07-07 20:54:07 +0000861 def _recover_repair(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000862 """\
863 Recovers a repair task.
864 Always repair host
865 """
showarded2afea2009-07-07 20:54:07 +0000866 return [RepairTask(host=host, queue_entry=queue_entry, task=task,
867 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000868
869
showarded2afea2009-07-07 20:54:07 +0000870 def _recover_cleanup(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000871 """\
872 Recovers a cleanup task.
873 No associated queue entry: Clean host
874 With associated queue entry: Clean host, verify host if needed, and
875 run associated queue entry
876 """
877 if not task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000878 return [CleanupTask(host=host, task=task,
879 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000880 else:
881 agent_tasks = [CleanupTask(queue_entry=queue_entry,
showarded2afea2009-07-07 20:54:07 +0000882 task=task,
883 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000884 if queue_entry.job.should_run_verify(queue_entry):
885 agent_tasks.append(VerifyTask(queue_entry=queue_entry))
886 agent_tasks.append(
887 SetEntryPendingTask(queue_entry=queue_entry))
888 return agent_tasks
889
890
showard6878e8b2009-07-20 22:37:45 +0000891 def _check_for_remaining_active_entries(self):
showard170873e2009-01-07 00:22:26 +0000892 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000893 where='active AND NOT complete AND '
894 '(aborted OR status != "Pending")')
showardd3dc1992009-04-22 21:01:40 +0000895
showard2fe3f1d2009-07-06 20:19:11 +0000896 message = '\n'.join(str(entry) for entry in queue_entries
897 if not self.get_agents_for_entry(entry))
898 if message:
899 email_manager.manager.enqueue_notify_email(
900 'Unrecovered active host queue entries exist',
901 message)
showard170873e2009-01-07 00:22:26 +0000902
903
showard1ff7b2e2009-05-15 23:17:18 +0000904 def _find_reverify(self):
showard6d7b2ff2009-06-10 00:16:47 +0000905 tasks = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +0000906 task=models.SpecialTask.Task.VERIFY, is_active=False,
907 is_complete=False, queue_entry__isnull=True)
showard6d7b2ff2009-06-10 00:16:47 +0000908
showard2fe3f1d2009-07-06 20:19:11 +0000909 for task in tasks:
910 host = Host.fetch(where='id = %s', params=(task.host.id,)).next()
911 if host.locked or host.invalid or self.host_has_agent(host):
912 continue
showard6d7b2ff2009-06-10 00:16:47 +0000913
showard2fe3f1d2009-07-06 20:19:11 +0000914 logging.info('Force reverifying host %s', host.hostname)
915 self.add_agent(Agent([VerifyTask(host=host, task=task)]))
showard1ff7b2e2009-05-15 23:17:18 +0000916
917
showard170873e2009-01-07 00:22:26 +0000918 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000919 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000920 # should never happen
showarded2afea2009-07-07 20:54:07 +0000921 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000922 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000923 self._reverify_hosts_where(
924 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Running')",
925 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000926
927
jadmanski0afbb632008-06-06 21:10:57 +0000928 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000929 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000930 full_where='locked = 0 AND invalid = 0 AND ' + where
931 for host in Host.fetch(where=full_where):
932 if self.host_has_agent(host):
933 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000934 continue
showard170873e2009-01-07 00:22:26 +0000935 if print_message:
showardb18134f2009-03-20 20:52:18 +0000936 logging.info(print_message, host.hostname)
showard2fe3f1d2009-07-06 20:19:11 +0000937 tasks = host.reverify_tasks()
showard170873e2009-01-07 00:22:26 +0000938 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000939
940
jadmanski0afbb632008-06-06 21:10:57 +0000941 def _recover_hosts(self):
942 # recover "Repair Failed" hosts
943 message = 'Reverifying dead host %s'
944 self._reverify_hosts_where("status = 'Repair Failed'",
945 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000946
947
showard04c82c52008-05-29 19:38:12 +0000948
showardb95b1bd2008-08-15 18:11:04 +0000949 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000950 # prioritize by job priority, then non-metahost over metahost, then FIFO
951 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000952 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000953 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000954 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000955
956
showard89f84db2009-03-12 20:39:13 +0000957 def _refresh_pending_queue_entries(self):
958 """
959 Lookup the pending HostQueueEntries and call our HostScheduler
960 refresh() method given that list. Return the list.
961
962 @returns A list of pending HostQueueEntries sorted in priority order.
963 """
showard63a34772008-08-18 19:32:50 +0000964 queue_entries = self._get_pending_queue_entries()
965 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000966 return []
showardb95b1bd2008-08-15 18:11:04 +0000967
showard63a34772008-08-18 19:32:50 +0000968 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000969
showard89f84db2009-03-12 20:39:13 +0000970 return queue_entries
971
972
973 def _schedule_atomic_group(self, queue_entry):
974 """
975 Schedule the given queue_entry on an atomic group of hosts.
976
977 Returns immediately if there are insufficient available hosts.
978
979 Creates new HostQueueEntries based off of queue_entry for the
980 scheduled hosts and starts them all running.
981 """
982 # This is a virtual host queue entry representing an entire
983 # atomic group, find a group and schedule their hosts.
984 group_hosts = self._host_scheduler.find_eligible_atomic_group(
985 queue_entry)
986 if not group_hosts:
987 return
showardcbe6f942009-06-17 19:33:49 +0000988
989 logging.info('Expanding atomic group entry %s with hosts %s',
990 queue_entry,
991 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +0000992 # The first assigned host uses the original HostQueueEntry
993 group_queue_entries = [queue_entry]
994 for assigned_host in group_hosts[1:]:
995 # Create a new HQE for every additional assigned_host.
996 new_hqe = HostQueueEntry.clone(queue_entry)
997 new_hqe.save()
998 group_queue_entries.append(new_hqe)
999 assert len(group_queue_entries) == len(group_hosts)
1000 for queue_entry, host in itertools.izip(group_queue_entries,
1001 group_hosts):
1002 self._run_queue_entry(queue_entry, host)
1003
1004
1005 def _schedule_new_jobs(self):
1006 queue_entries = self._refresh_pending_queue_entries()
1007 if not queue_entries:
1008 return
1009
showard63a34772008-08-18 19:32:50 +00001010 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +00001011 if (queue_entry.atomic_group_id is None or
1012 queue_entry.host_id is not None):
1013 assigned_host = self._host_scheduler.find_eligible_host(
1014 queue_entry)
1015 if assigned_host:
1016 self._run_queue_entry(queue_entry, assigned_host)
1017 else:
1018 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001019
1020
1021 def _run_queue_entry(self, queue_entry, host):
showard77182562009-06-10 00:16:05 +00001022 agent = queue_entry.run_pre_job_tasks(assigned_host=host)
1023 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +00001024
1025
jadmanski0afbb632008-06-06 21:10:57 +00001026 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001027 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
1028 for agent in self.get_agents_for_entry(entry):
1029 agent.abort()
1030 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001031
1032
showard324bf812009-01-20 23:23:38 +00001033 def _can_start_agent(self, agent, num_started_this_cycle,
1034 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001035 # always allow zero-process agents to run
1036 if agent.num_processes == 0:
1037 return True
1038 # don't allow any nonzero-process agents to run after we've reached a
1039 # limit (this avoids starvation of many-process agents)
1040 if have_reached_limit:
1041 return False
1042 # total process throttling
showard324bf812009-01-20 23:23:38 +00001043 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +00001044 return False
1045 # if a single agent exceeds the per-cycle throttling, still allow it to
1046 # run when it's the first agent in the cycle
1047 if num_started_this_cycle == 0:
1048 return True
1049 # per-cycle throttling
1050 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +00001051 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001052 return False
1053 return True
1054
1055
jadmanski0afbb632008-06-06 21:10:57 +00001056 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001057 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001058 have_reached_limit = False
1059 # iterate over copy, so we can remove agents during iteration
1060 for agent in list(self._agents):
1061 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +00001062 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +00001063 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +00001064 continue
1065 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +00001066 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001067 have_reached_limit):
1068 have_reached_limit = True
1069 continue
showard4c5374f2008-09-04 17:02:56 +00001070 num_started_this_cycle += agent.num_processes
1071 agent.tick()
showarda9435c02009-05-13 21:28:17 +00001072 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001073 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001074
1075
showard29f7cd22009-04-29 21:16:24 +00001076 def _process_recurring_runs(self):
1077 recurring_runs = models.RecurringRun.objects.filter(
1078 start_date__lte=datetime.datetime.now())
1079 for rrun in recurring_runs:
1080 # Create job from template
1081 job = rrun.job
1082 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001083 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001084
1085 host_objects = info['hosts']
1086 one_time_hosts = info['one_time_hosts']
1087 metahost_objects = info['meta_hosts']
1088 dependencies = info['dependencies']
1089 atomic_group = info['atomic_group']
1090
1091 for host in one_time_hosts or []:
1092 this_host = models.Host.create_one_time_host(host.hostname)
1093 host_objects.append(this_host)
1094
1095 try:
1096 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001097 options=options,
showard29f7cd22009-04-29 21:16:24 +00001098 host_objects=host_objects,
1099 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001100 atomic_group=atomic_group)
1101
1102 except Exception, ex:
1103 logging.exception(ex)
1104 #TODO send email
1105
1106 if rrun.loop_count == 1:
1107 rrun.delete()
1108 else:
1109 if rrun.loop_count != 0: # if not infinite loop
1110 # calculate new start_date
1111 difference = datetime.timedelta(seconds=rrun.loop_period)
1112 rrun.start_date = rrun.start_date + difference
1113 rrun.loop_count -= 1
1114 rrun.save()
1115
1116
showard170873e2009-01-07 00:22:26 +00001117class PidfileRunMonitor(object):
1118 """
1119 Client must call either run() to start a new process or
1120 attach_to_existing_process().
1121 """
mbligh36768f02008-02-22 18:28:33 +00001122
showard170873e2009-01-07 00:22:26 +00001123 class _PidfileException(Exception):
1124 """
1125 Raised when there's some unexpected behavior with the pid file, but only
1126 used internally (never allowed to escape this class).
1127 """
mbligh36768f02008-02-22 18:28:33 +00001128
1129
showard170873e2009-01-07 00:22:26 +00001130 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001131 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001132 self._start_time = None
1133 self.pidfile_id = None
1134 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001135
1136
showard170873e2009-01-07 00:22:26 +00001137 def _add_nice_command(self, command, nice_level):
1138 if not nice_level:
1139 return command
1140 return ['nice', '-n', str(nice_level)] + command
1141
1142
1143 def _set_start_time(self):
1144 self._start_time = time.time()
1145
1146
1147 def run(self, command, working_directory, nice_level=None, log_file=None,
1148 pidfile_name=None, paired_with_pidfile=None):
1149 assert command is not None
1150 if nice_level is not None:
1151 command = ['nice', '-n', str(nice_level)] + command
1152 self._set_start_time()
1153 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001154 command, working_directory, pidfile_name=pidfile_name,
1155 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001156
1157
showarded2afea2009-07-07 20:54:07 +00001158 def attach_to_existing_process(self, execution_path,
showardd3dc1992009-04-22 21:01:40 +00001159 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001160 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001161 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001162 execution_path, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001163 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001164
1165
jadmanski0afbb632008-06-06 21:10:57 +00001166 def kill(self):
showard170873e2009-01-07 00:22:26 +00001167 if self.has_process():
1168 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001169
mbligh36768f02008-02-22 18:28:33 +00001170
showard170873e2009-01-07 00:22:26 +00001171 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001172 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001173 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001174
1175
showard170873e2009-01-07 00:22:26 +00001176 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001177 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001178 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001179 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001180
1181
showard170873e2009-01-07 00:22:26 +00001182 def _read_pidfile(self, use_second_read=False):
1183 assert self.pidfile_id is not None, (
1184 'You must call run() or attach_to_existing_process()')
1185 contents = _drone_manager.get_pidfile_contents(
1186 self.pidfile_id, use_second_read=use_second_read)
1187 if contents.is_invalid():
1188 self._state = drone_manager.PidfileContents()
1189 raise self._PidfileException(contents)
1190 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001191
1192
showard21baa452008-10-21 00:08:39 +00001193 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001194 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1195 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001196 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001197 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001198
1199
1200 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001201 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001202 return
mblighbb421852008-03-11 22:36:16 +00001203
showard21baa452008-10-21 00:08:39 +00001204 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001205
showard170873e2009-01-07 00:22:26 +00001206 if self._state.process is None:
1207 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001208 return
mbligh90a549d2008-03-25 23:52:34 +00001209
showard21baa452008-10-21 00:08:39 +00001210 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001211 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001212 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001213 return
mbligh90a549d2008-03-25 23:52:34 +00001214
showard170873e2009-01-07 00:22:26 +00001215 # pid but no running process - maybe process *just* exited
1216 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001217 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001218 # autoserv exited without writing an exit code
1219 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001220 self._handle_pidfile_error(
1221 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001222
showard21baa452008-10-21 00:08:39 +00001223
1224 def _get_pidfile_info(self):
1225 """\
1226 After completion, self._state will contain:
1227 pid=None, exit_status=None if autoserv has not yet run
1228 pid!=None, exit_status=None if autoserv is running
1229 pid!=None, exit_status!=None if autoserv has completed
1230 """
1231 try:
1232 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001233 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001234 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001235
1236
showard170873e2009-01-07 00:22:26 +00001237 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001238 """\
1239 Called when no pidfile is found or no pid is in the pidfile.
1240 """
showard170873e2009-01-07 00:22:26 +00001241 message = 'No pid found at %s' % self.pidfile_id
showard170873e2009-01-07 00:22:26 +00001242 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1243 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001244 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001245 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001246
1247
showard35162b02009-03-03 02:17:30 +00001248 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001249 """\
1250 Called when autoserv has exited without writing an exit status,
1251 or we've timed out waiting for autoserv to write a pid to the
1252 pidfile. In either case, we just return failure and the caller
1253 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001254
showard170873e2009-01-07 00:22:26 +00001255 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001256 """
1257 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001258 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001259 self._state.exit_status = 1
1260 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001261
1262
jadmanski0afbb632008-06-06 21:10:57 +00001263 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001264 self._get_pidfile_info()
1265 return self._state.exit_status
1266
1267
1268 def num_tests_failed(self):
1269 self._get_pidfile_info()
1270 assert self._state.num_tests_failed is not None
1271 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001272
1273
mbligh36768f02008-02-22 18:28:33 +00001274class Agent(object):
showard77182562009-06-10 00:16:05 +00001275 """
1276 An agent for use by the Dispatcher class to perform a sequence of tasks.
1277
1278 The following methods are required on all task objects:
1279 poll() - Called periodically to let the task check its status and
1280 update its internal state. If the task succeeded.
1281 is_done() - Returns True if the task is finished.
1282 abort() - Called when an abort has been requested. The task must
1283 set its aborted attribute to True if it actually aborted.
1284
1285 The following attributes are required on all task objects:
1286 aborted - bool, True if this task was aborted.
1287 failure_tasks - A sequence of tasks to be run using a new Agent
1288 by the dispatcher should this task fail.
1289 success - bool, True if this task succeeded.
1290 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1291 host_ids - A sequence of Host ids this task represents.
1292
1293 The following attribute is written to all task objects:
1294 agent - A reference to the Agent instance that the task has been
1295 added to.
1296 """
1297
1298
showard170873e2009-01-07 00:22:26 +00001299 def __init__(self, tasks, num_processes=1):
showard77182562009-06-10 00:16:05 +00001300 """
1301 @param tasks: A list of tasks as described in the class docstring.
1302 @param num_processes: The number of subprocesses the Agent represents.
1303 This is used by the Dispatcher for managing the load on the
1304 system. Defaults to 1.
1305 """
jadmanski0afbb632008-06-06 21:10:57 +00001306 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001307 self.queue = None
showard77182562009-06-10 00:16:05 +00001308 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001309 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001310 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001311
showard170873e2009-01-07 00:22:26 +00001312 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1313 for task in tasks)
1314 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1315
showardd3dc1992009-04-22 21:01:40 +00001316 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001317 for task in tasks:
1318 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001319
1320
showardd3dc1992009-04-22 21:01:40 +00001321 def _clear_queue(self):
1322 self.queue = Queue.Queue(0)
1323
1324
showard170873e2009-01-07 00:22:26 +00001325 def _union_ids(self, id_lists):
1326 return set(itertools.chain(*id_lists))
1327
1328
jadmanski0afbb632008-06-06 21:10:57 +00001329 def add_task(self, task):
1330 self.queue.put_nowait(task)
1331 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001332
1333
jadmanski0afbb632008-06-06 21:10:57 +00001334 def tick(self):
showard21baa452008-10-21 00:08:39 +00001335 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001336 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001337 self.active_task.poll()
1338 if not self.active_task.is_done():
1339 return
1340 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001341
1342
jadmanski0afbb632008-06-06 21:10:57 +00001343 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001344 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001345 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001346 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001347 if not self.active_task.success:
1348 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001349 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001350
jadmanski0afbb632008-06-06 21:10:57 +00001351 if not self.is_done():
1352 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001353
1354
jadmanski0afbb632008-06-06 21:10:57 +00001355 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001356 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001357 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1358 # get reset.
1359 new_agent = Agent(self.active_task.failure_tasks)
1360 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001361
mblighe2586682008-02-29 22:45:46 +00001362
showard4c5374f2008-09-04 17:02:56 +00001363 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001364 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001365
1366
jadmanski0afbb632008-06-06 21:10:57 +00001367 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001368 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001369
1370
showardd3dc1992009-04-22 21:01:40 +00001371 def abort(self):
showard08a36412009-05-05 01:01:13 +00001372 # abort tasks until the queue is empty or a task ignores the abort
1373 while not self.is_done():
1374 if not self.active_task:
1375 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001376 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001377 if not self.active_task.aborted:
1378 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001379 return
1380 self.active_task = None
1381
showardd3dc1992009-04-22 21:01:40 +00001382
showard77182562009-06-10 00:16:05 +00001383class DelayedCallTask(object):
1384 """
1385 A task object like AgentTask for an Agent to run that waits for the
1386 specified amount of time to have elapsed before calling the supplied
1387 callback once and finishing. If the callback returns anything, it is
1388 assumed to be a new Agent instance and will be added to the dispatcher.
1389
1390 @attribute end_time: The absolute posix time after which this task will
1391 call its callback when it is polled and be finished.
1392
1393 Also has all attributes required by the Agent class.
1394 """
1395 def __init__(self, delay_seconds, callback, now_func=None):
1396 """
1397 @param delay_seconds: The delay in seconds from now that this task
1398 will call the supplied callback and be done.
1399 @param callback: A callable to be called by this task once after at
1400 least delay_seconds time has elapsed. It must return None
1401 or a new Agent instance.
1402 @param now_func: A time.time like function. Default: time.time.
1403 Used for testing.
1404 """
1405 assert delay_seconds > 0
1406 assert callable(callback)
1407 if not now_func:
1408 now_func = time.time
1409 self._now_func = now_func
1410 self._callback = callback
1411
1412 self.end_time = self._now_func() + delay_seconds
1413
1414 # These attributes are required by Agent.
1415 self.aborted = False
1416 self.failure_tasks = ()
1417 self.host_ids = ()
1418 self.success = False
1419 self.queue_entry_ids = ()
1420 # This is filled in by Agent.add_task().
1421 self.agent = None
1422
1423
1424 def poll(self):
1425 if self._callback and self._now_func() >= self.end_time:
1426 new_agent = self._callback()
1427 if new_agent:
1428 self.agent.dispatcher.add_agent(new_agent)
1429 self._callback = None
1430 self.success = True
1431
1432
1433 def is_done(self):
1434 return not self._callback
1435
1436
1437 def abort(self):
1438 self.aborted = True
1439 self._callback = None
1440
1441
mbligh36768f02008-02-22 18:28:33 +00001442class AgentTask(object):
showarded2afea2009-07-07 20:54:07 +00001443 def __init__(self, cmd=None, working_directory=None, failure_tasks=[],
1444 pidfile_name=None, paired_with_pidfile=None,
1445 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001446 self.done = False
1447 self.failure_tasks = failure_tasks
jadmanski0afbb632008-06-06 21:10:57 +00001448 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001449 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001450 self.agent = None
showarded2afea2009-07-07 20:54:07 +00001451 self.monitor = recover_run_monitor
1452 self.started = bool(recover_run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +00001453 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001454 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001455 self.queue_entry_ids = []
1456 self.host_ids = []
1457 self.log_file = None
1458
1459
1460 def _set_ids(self, host=None, queue_entries=None):
1461 if queue_entries and queue_entries != [None]:
1462 self.host_ids = [entry.host.id for entry in queue_entries]
1463 self.queue_entry_ids = [entry.id for entry in queue_entries]
1464 else:
1465 assert host
1466 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001467
1468
jadmanski0afbb632008-06-06 21:10:57 +00001469 def poll(self):
showard08a36412009-05-05 01:01:13 +00001470 if not self.started:
1471 self.start()
1472 self.tick()
1473
1474
1475 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001476 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001477 exit_code = self.monitor.exit_code()
1478 if exit_code is None:
1479 return
1480 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001481 else:
1482 success = False
mbligh36768f02008-02-22 18:28:33 +00001483
jadmanski0afbb632008-06-06 21:10:57 +00001484 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001485
1486
jadmanski0afbb632008-06-06 21:10:57 +00001487 def is_done(self):
1488 return self.done
mbligh36768f02008-02-22 18:28:33 +00001489
1490
jadmanski0afbb632008-06-06 21:10:57 +00001491 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001492 if self.done:
1493 return
jadmanski0afbb632008-06-06 21:10:57 +00001494 self.done = True
1495 self.success = success
1496 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001497
1498
jadmanski0afbb632008-06-06 21:10:57 +00001499 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001500 assert not self.monitor
mblighd64e5702008-04-04 21:39:28 +00001501
mbligh36768f02008-02-22 18:28:33 +00001502
jadmanski0afbb632008-06-06 21:10:57 +00001503 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001504 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001505 _drone_manager.copy_to_results_repository(
1506 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001507
1508
jadmanski0afbb632008-06-06 21:10:57 +00001509 def epilog(self):
1510 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001511
1512
jadmanski0afbb632008-06-06 21:10:57 +00001513 def start(self):
1514 assert self.agent
1515
1516 if not self.started:
1517 self.prolog()
1518 self.run()
1519
1520 self.started = True
1521
1522
1523 def abort(self):
1524 if self.monitor:
1525 self.monitor.kill()
1526 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001527 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001528 self.cleanup()
1529
1530
showarded2afea2009-07-07 20:54:07 +00001531 def _get_consistent_execution_path(self, execution_entries):
1532 first_execution_path = execution_entries[0].execution_path()
1533 for execution_entry in execution_entries[1:]:
1534 assert execution_entry.execution_path() == first_execution_path, (
1535 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1536 execution_entry,
1537 first_execution_path,
1538 execution_entries[0]))
1539 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001540
1541
showarded2afea2009-07-07 20:54:07 +00001542 def _copy_results(self, execution_entries, use_monitor=None):
1543 """
1544 @param execution_entries: list of objects with execution_path() method
1545 """
1546 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001547 if use_monitor is None:
1548 assert self.monitor
1549 use_monitor = self.monitor
1550 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001551 execution_path = self._get_consistent_execution_path(execution_entries)
1552 results_path = execution_path + '/'
showard6b733412009-04-27 20:09:18 +00001553 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001554 results_path)
showardde634ee2009-01-30 01:44:24 +00001555
showarda1e74b32009-05-12 17:32:04 +00001556
1557 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001558 reparse_task = FinalReparseTask(queue_entries)
1559 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1560
1561
showarda1e74b32009-05-12 17:32:04 +00001562 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1563 self._copy_results(queue_entries, use_monitor)
1564 self._parse_results(queue_entries)
1565
1566
showardd3dc1992009-04-22 21:01:40 +00001567 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
showarded2afea2009-07-07 20:54:07 +00001568 assert not self.monitor
jadmanski0afbb632008-06-06 21:10:57 +00001569 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001570 self.monitor = PidfileRunMonitor()
1571 self.monitor.run(self.cmd, self._working_directory,
1572 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001573 log_file=self.log_file,
1574 pidfile_name=pidfile_name,
1575 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001576
1577
showardd9205182009-04-27 20:09:55 +00001578class TaskWithJobKeyvals(object):
1579 """AgentTask mixin providing functionality to help with job keyval files."""
1580 _KEYVAL_FILE = 'keyval'
1581 def _format_keyval(self, key, value):
1582 return '%s=%s' % (key, value)
1583
1584
1585 def _keyval_path(self):
1586 """Subclasses must override this"""
1587 raise NotImplemented
1588
1589
1590 def _write_keyval_after_job(self, field, value):
1591 assert self.monitor
1592 if not self.monitor.has_process():
1593 return
1594 _drone_manager.write_lines_to_file(
1595 self._keyval_path(), [self._format_keyval(field, value)],
1596 paired_with_process=self.monitor.get_process())
1597
1598
1599 def _job_queued_keyval(self, job):
1600 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1601
1602
1603 def _write_job_finished(self):
1604 self._write_keyval_after_job("job_finished", int(time.time()))
1605
1606
showarded2afea2009-07-07 20:54:07 +00001607class SpecialAgentTask(AgentTask):
1608 """
1609 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1610 """
1611
1612 TASK_TYPE = None
1613 host = None
1614 queue_entry = None
1615
1616 def __init__(self, task, extra_command_args, **kwargs):
1617 assert self.host
1618 assert (self.TASK_TYPE is not None,
1619 'self.TASK_TYPE must be overridden')
1620 self.task = task
showard8ac6f2a2009-07-16 14:50:32 +00001621 if task:
1622 kwargs['working_directory'] = task.execution_path()
showarded2afea2009-07-07 20:54:07 +00001623 self._extra_command_args = extra_command_args
1624 super(SpecialAgentTask, self).__init__(**kwargs)
1625
1626
1627 def prolog(self):
1628 super(SpecialAgentTask, self).prolog()
1629 self.task = models.SpecialTask.prepare(self, self.task)
1630 self.cmd = _autoserv_command_line(self.host.hostname,
1631 self._extra_command_args,
1632 queue_entry=self.queue_entry)
1633 self._working_directory = self.task.execution_path()
1634 self.task.activate()
1635
1636
showardb6681aa2009-07-08 21:15:00 +00001637 def cleanup(self):
1638 super(SpecialAgentTask, self).cleanup()
showarda640b2d2009-07-20 22:37:06 +00001639
1640 # self.task can be None if a SpecialAgentTask is aborted before the
1641 # prolog runs
1642 if self.task:
1643 self.task.finish()
1644
1645 if self.monitor and self.monitor.has_process() and self.task:
showarded2afea2009-07-07 20:54:07 +00001646 self._copy_results([self.task])
1647
1648
1649class RepairTask(SpecialAgentTask, TaskWithJobKeyvals):
1650 TASK_TYPE = models.SpecialTask.Task.REPAIR
1651
1652
1653 def __init__(self, host, queue_entry=None, task=None,
1654 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001655 """\
showard170873e2009-01-07 00:22:26 +00001656 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001657 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001658 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001659 # normalize the protection name
1660 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001661
jadmanski0afbb632008-06-06 21:10:57 +00001662 self.host = host
showardcfd4a7e2009-07-11 01:47:33 +00001663 self.queue_entry = None
1664 # recovery code can pass a HQE that's already been requeued. for a
1665 # metahost, that means the host has been unassigned. in that case,
1666 # ignore the HQE.
1667 hqe_still_assigned_to_this_host = (queue_entry and queue_entry.host
1668 and queue_entry.host.id == host.id)
1669 if hqe_still_assigned_to_this_host:
1670 self.queue_entry = queue_entry
showard170873e2009-01-07 00:22:26 +00001671
showarded2afea2009-07-07 20:54:07 +00001672 super(RepairTask, self).__init__(
1673 task, ['-R', '--host-protection', protection],
1674 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001675
showard2fe3f1d2009-07-06 20:19:11 +00001676 # *don't* include the queue entry in IDs -- if the queue entry is
1677 # aborted, we want to leave the repair task running
1678 self._set_ids(host=host)
1679
mbligh36768f02008-02-22 18:28:33 +00001680
jadmanski0afbb632008-06-06 21:10:57 +00001681 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001682 super(RepairTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001683 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001684 self.host.set_status('Repairing')
showard2fe3f1d2009-07-06 20:19:11 +00001685 if self.queue_entry:
1686 self.queue_entry.requeue()
1687
mbligh36768f02008-02-22 18:28:33 +00001688
showardd9205182009-04-27 20:09:55 +00001689 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001690 return os.path.join(self._working_directory, self._KEYVAL_FILE)
showardd9205182009-04-27 20:09:55 +00001691
1692
showardde634ee2009-01-30 01:44:24 +00001693 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001694 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001695
showard2fe3f1d2009-07-06 20:19:11 +00001696 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001697 return # don't fail metahost entries, they'll be reassigned
1698
showard2fe3f1d2009-07-06 20:19:11 +00001699 self.queue_entry.update_from_database()
1700 if self.queue_entry.status != 'Queued':
showardccbd6c52009-03-21 00:10:21 +00001701 return # entry has been aborted
1702
showard2fe3f1d2009-07-06 20:19:11 +00001703 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001704 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001705 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001706 self._write_keyval_after_job(queued_key, queued_time)
1707 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001708 # copy results logs into the normal place for job results
1709 _drone_manager.copy_results_on_drone(
1710 self.monitor.get_process(),
showarded2afea2009-07-07 20:54:07 +00001711 source_path=self._working_directory + '/',
1712 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001713
showard2fe3f1d2009-07-06 20:19:11 +00001714 self._copy_results([self.queue_entry])
1715 if self.queue_entry.job.parse_failed_repair:
1716 self._parse_results([self.queue_entry])
1717 self.queue_entry.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001718
1719
jadmanski0afbb632008-06-06 21:10:57 +00001720 def epilog(self):
1721 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001722
jadmanski0afbb632008-06-06 21:10:57 +00001723 if self.success:
1724 self.host.set_status('Ready')
1725 else:
1726 self.host.set_status('Repair Failed')
showard2fe3f1d2009-07-06 20:19:11 +00001727 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001728 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001729
1730
showarded2afea2009-07-07 20:54:07 +00001731class PreJobTask(SpecialAgentTask):
showard170873e2009-01-07 00:22:26 +00001732 def epilog(self):
1733 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001734 should_copy_results = (self.queue_entry and not self.success
1735 and not self.queue_entry.meta_host)
1736 if should_copy_results:
1737 self.queue_entry.set_execution_subdir()
showarded2afea2009-07-07 20:54:07 +00001738 log_name = os.path.basename(self.task.execution_path())
1739 source = os.path.join(self.task.execution_path(), 'debug',
1740 'autoserv.DEBUG')
1741 destination = os.path.join(self.queue_entry.execution_path(),
1742 log_name)
showard170873e2009-01-07 00:22:26 +00001743 _drone_manager.copy_to_results_repository(
showarded2afea2009-07-07 20:54:07 +00001744 self.monitor.get_process(), source,
showard170873e2009-01-07 00:22:26 +00001745 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001746
1747
1748class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001749 TASK_TYPE = models.SpecialTask.Task.VERIFY
1750
1751
1752 def __init__(self, queue_entry=None, host=None, task=None,
1753 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001754 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001755 self.host = host or queue_entry.host
1756 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001757
showarde788ea62008-11-17 21:02:47 +00001758 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showarded2afea2009-07-07 20:54:07 +00001759 super(VerifyTask, self).__init__(
1760 task, ['-v'], failure_tasks=failure_tasks,
1761 recover_run_monitor=recover_run_monitor)
mblighe2586682008-02-29 22:45:46 +00001762
showard170873e2009-01-07 00:22:26 +00001763 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001764
1765
jadmanski0afbb632008-06-06 21:10:57 +00001766 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001767 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001768
showardb18134f2009-03-20 20:52:18 +00001769 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001770 if self.queue_entry:
1771 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001772 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001773
showarded2afea2009-07-07 20:54:07 +00001774 # Delete any other queued verifies for this host. One verify will do
1775 # and there's no need to keep records of other requests.
1776 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001777 host__id=self.host.id,
1778 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001779 is_active=False, is_complete=False)
1780 queued_verifies = queued_verifies.exclude(id=self.task.id)
1781 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001782
mbligh36768f02008-02-22 18:28:33 +00001783
jadmanski0afbb632008-06-06 21:10:57 +00001784 def epilog(self):
1785 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001786 if self.success:
jadmanski0afbb632008-06-06 21:10:57 +00001787 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001788
1789
showardb5626452009-06-30 01:57:28 +00001790class CleanupHostsMixin(object):
1791 def _reboot_hosts(self, job, queue_entries, final_success,
1792 num_tests_failed):
1793 reboot_after = job.reboot_after
1794 do_reboot = (
1795 # always reboot after aborted jobs
1796 self._final_status == models.HostQueueEntry.Status.ABORTED
1797 or reboot_after == models.RebootAfter.ALWAYS
1798 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
1799 and final_success and num_tests_failed == 0))
1800
1801 for queue_entry in queue_entries:
1802 if do_reboot:
1803 # don't pass the queue entry to the CleanupTask. if the cleanup
1804 # fails, the job doesn't care -- it's over.
1805 cleanup_task = CleanupTask(host=queue_entry.host)
1806 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1807 else:
1808 queue_entry.host.set_status('Ready')
1809
1810
1811class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
showarded2afea2009-07-07 20:54:07 +00001812 def __init__(self, job, queue_entries, cmd=None, group_name='',
1813 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001814 self.job = job
1815 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001816 self.group_name = group_name
showarded2afea2009-07-07 20:54:07 +00001817 super(QueueTask, self).__init__(
1818 cmd, self._execution_path(),
1819 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001820 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001821
1822
showard73ec0442009-02-07 02:05:20 +00001823 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001824 return os.path.join(self._execution_path(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001825
1826
1827 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1828 keyval_contents = '\n'.join(self._format_keyval(key, value)
1829 for key, value in keyval_dict.iteritems())
1830 # always end with a newline to allow additional keyvals to be written
1831 keyval_contents += '\n'
showarded2afea2009-07-07 20:54:07 +00001832 _drone_manager.attach_file_to_execution(self._execution_path(),
showard73ec0442009-02-07 02:05:20 +00001833 keyval_contents,
1834 file_path=keyval_path)
1835
1836
1837 def _write_keyvals_before_job(self, keyval_dict):
1838 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1839
1840
showard170873e2009-01-07 00:22:26 +00001841 def _write_host_keyvals(self, host):
showarded2afea2009-07-07 20:54:07 +00001842 keyval_path = os.path.join(self._execution_path(), 'host_keyvals',
showard170873e2009-01-07 00:22:26 +00001843 host.hostname)
1844 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001845 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1846 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001847
1848
showarded2afea2009-07-07 20:54:07 +00001849 def _execution_path(self):
1850 return self.queue_entries[0].execution_path()
mblighbb421852008-03-11 22:36:16 +00001851
1852
jadmanski0afbb632008-06-06 21:10:57 +00001853 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001854 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001855 keyval_dict = {queued_key: queued_time}
1856 if self.group_name:
1857 keyval_dict['host_group_name'] = self.group_name
1858 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001859 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001860 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001861 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001862 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001863 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001864 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001865 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001866 assert len(self.queue_entries) == 1
1867 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001868
1869
showard35162b02009-03-03 02:17:30 +00001870 def _write_lost_process_error_file(self):
showarded2afea2009-07-07 20:54:07 +00001871 error_file_path = os.path.join(self._execution_path(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001872 _drone_manager.write_lines_to_file(error_file_path,
1873 [_LOST_PROCESS_ERROR])
1874
1875
showardd3dc1992009-04-22 21:01:40 +00001876 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001877 if not self.monitor:
1878 return
1879
showardd9205182009-04-27 20:09:55 +00001880 self._write_job_finished()
1881
showardd3dc1992009-04-22 21:01:40 +00001882 # both of these conditionals can be true, iff the process ran, wrote a
1883 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001884 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001885 gather_task = GatherLogsTask(self.job, self.queue_entries)
1886 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showardb5626452009-06-30 01:57:28 +00001887 else:
1888 self._reboot_hosts(self.job, self.queue_entries,
1889 final_success=False, num_tests_failed=0)
showard35162b02009-03-03 02:17:30 +00001890
1891 if self.monitor.lost_process:
1892 self._write_lost_process_error_file()
1893 for queue_entry in self.queue_entries:
1894 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001895
1896
showardcbd74612008-11-19 21:42:02 +00001897 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001898 _drone_manager.write_lines_to_file(
showarded2afea2009-07-07 20:54:07 +00001899 os.path.join(self._execution_path(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001900 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001901 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001902
1903
jadmanskif7fa2cc2008-10-01 14:13:23 +00001904 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001905 if not self.monitor or not self.monitor.has_process():
1906 return
1907
jadmanskif7fa2cc2008-10-01 14:13:23 +00001908 # build up sets of all the aborted_by and aborted_on values
1909 aborted_by, aborted_on = set(), set()
1910 for queue_entry in self.queue_entries:
1911 if queue_entry.aborted_by:
1912 aborted_by.add(queue_entry.aborted_by)
1913 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1914 aborted_on.add(t)
1915
1916 # extract some actual, unique aborted by value and write it out
1917 assert len(aborted_by) <= 1
1918 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001919 aborted_by_value = aborted_by.pop()
1920 aborted_on_value = max(aborted_on)
1921 else:
1922 aborted_by_value = 'autotest_system'
1923 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001924
showarda0382352009-02-11 23:36:43 +00001925 self._write_keyval_after_job("aborted_by", aborted_by_value)
1926 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001927
showardcbd74612008-11-19 21:42:02 +00001928 aborted_on_string = str(datetime.datetime.fromtimestamp(
1929 aborted_on_value))
1930 self._write_status_comment('Job aborted by %s on %s' %
1931 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001932
1933
jadmanski0afbb632008-06-06 21:10:57 +00001934 def abort(self):
1935 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001936 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001937 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001938
1939
jadmanski0afbb632008-06-06 21:10:57 +00001940 def epilog(self):
1941 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001942 self._finish_task()
1943 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001944
1945
showardd3dc1992009-04-22 21:01:40 +00001946class PostJobTask(AgentTask):
1947 def __init__(self, queue_entries, pidfile_name, logfile_name,
showarded2afea2009-07-07 20:54:07 +00001948 recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00001949 self._queue_entries = queue_entries
1950 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00001951
showarded2afea2009-07-07 20:54:07 +00001952 self._execution_path = self._get_consistent_execution_path(
1953 queue_entries)
1954 self._results_dir = _drone_manager.absolute_path(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00001955 self._autoserv_monitor = PidfileRunMonitor()
showarded2afea2009-07-07 20:54:07 +00001956 self._autoserv_monitor.attach_to_existing_process(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00001957 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1958
1959 if _testing_mode:
1960 command = 'true'
1961 else:
1962 command = self._generate_command(self._results_dir)
1963
showarded2afea2009-07-07 20:54:07 +00001964 super(PostJobTask, self).__init__(
1965 cmd=command, working_directory=self._execution_path,
1966 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00001967
showarded2afea2009-07-07 20:54:07 +00001968 self.log_file = os.path.join(self._execution_path, logfile_name)
showardd3dc1992009-04-22 21:01:40 +00001969 self._final_status = self._determine_final_status()
1970
1971
1972 def _generate_command(self, results_dir):
1973 raise NotImplementedError('Subclasses must override this')
1974
1975
1976 def _job_was_aborted(self):
1977 was_aborted = None
1978 for queue_entry in self._queue_entries:
1979 queue_entry.update_from_database()
1980 if was_aborted is None: # first queue entry
1981 was_aborted = bool(queue_entry.aborted)
1982 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1983 email_manager.manager.enqueue_notify_email(
1984 'Inconsistent abort state',
1985 'Queue entries have inconsistent abort state: ' +
1986 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1987 # don't crash here, just assume true
1988 return True
1989 return was_aborted
1990
1991
1992 def _determine_final_status(self):
1993 if self._job_was_aborted():
1994 return models.HostQueueEntry.Status.ABORTED
1995
1996 # we'll use a PidfileRunMonitor to read the autoserv exit status
1997 if self._autoserv_monitor.exit_code() == 0:
1998 return models.HostQueueEntry.Status.COMPLETED
1999 return models.HostQueueEntry.Status.FAILED
2000
2001
2002 def run(self):
showard5add1c82009-05-26 19:27:46 +00002003 # make sure we actually have results to work with.
2004 # this should never happen in normal operation.
2005 if not self._autoserv_monitor.has_process():
2006 email_manager.manager.enqueue_notify_email(
2007 'No results in post-job task',
2008 'No results in post-job task at %s' %
2009 self._autoserv_monitor.pidfile_id)
2010 self.finished(False)
2011 return
2012
2013 super(PostJobTask, self).run(
2014 pidfile_name=self._pidfile_name,
2015 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00002016
2017
2018 def _set_all_statuses(self, status):
2019 for queue_entry in self._queue_entries:
2020 queue_entry.set_status(status)
2021
2022
2023 def abort(self):
2024 # override AgentTask.abort() to avoid killing the process and ending
2025 # the task. post-job tasks continue when the job is aborted.
2026 pass
2027
2028
showardb5626452009-06-30 01:57:28 +00002029class GatherLogsTask(PostJobTask, CleanupHostsMixin):
showardd3dc1992009-04-22 21:01:40 +00002030 """
2031 Task responsible for
2032 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2033 * copying logs to the results repository
2034 * spawning CleanupTasks for hosts, if necessary
2035 * spawning a FinalReparseTask for the job
2036 """
showarded2afea2009-07-07 20:54:07 +00002037 def __init__(self, job, queue_entries, recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002038 self._job = job
2039 super(GatherLogsTask, self).__init__(
2040 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
showarded2afea2009-07-07 20:54:07 +00002041 logfile_name='.collect_crashinfo.log',
2042 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002043 self._set_ids(queue_entries=queue_entries)
2044
2045
2046 def _generate_command(self, results_dir):
2047 host_list = ','.join(queue_entry.host.hostname
2048 for queue_entry in self._queue_entries)
2049 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2050 '-r', results_dir]
2051
2052
2053 def prolog(self):
2054 super(GatherLogsTask, self).prolog()
2055 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
2056
2057
showardd3dc1992009-04-22 21:01:40 +00002058 def epilog(self):
2059 super(GatherLogsTask, self).epilog()
showardebc0fb72009-05-13 21:28:07 +00002060 if self._autoserv_monitor.has_process():
2061 self._copy_and_parse_results(self._queue_entries,
2062 use_monitor=self._autoserv_monitor)
showardb5626452009-06-30 01:57:28 +00002063
2064 final_success = (
2065 self._final_status == models.HostQueueEntry.Status.COMPLETED)
2066 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2067 self._reboot_hosts(self._job, self._queue_entries, final_success,
2068 num_tests_failed)
showardd3dc1992009-04-22 21:01:40 +00002069
2070
showard0bbfc212009-04-29 21:06:13 +00002071 def run(self):
showard597bfd32009-05-08 18:22:50 +00002072 autoserv_exit_code = self._autoserv_monitor.exit_code()
2073 # only run if Autoserv exited due to some signal. if we have no exit
2074 # code, assume something bad (and signal-like) happened.
2075 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002076 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002077 else:
2078 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002079
2080
showard8fe93b52008-11-18 17:53:22 +00002081class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002082 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2083
2084
2085 def __init__(self, host=None, queue_entry=None, task=None,
2086 recover_run_monitor=None):
showardfa8629c2008-11-04 16:51:23 +00002087 assert bool(host) ^ bool(queue_entry)
2088 if queue_entry:
2089 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00002090 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00002091 self.host = host
showard170873e2009-01-07 00:22:26 +00002092
showarde788ea62008-11-17 21:02:47 +00002093 repair_task = RepairTask(host, queue_entry=queue_entry)
showarded2afea2009-07-07 20:54:07 +00002094 super(CleanupTask, self).__init__(
2095 task, ['--cleanup'], failure_tasks=[repair_task],
2096 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002097
2098 self._set_ids(host=host, queue_entries=[queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002099
mblighd5c95802008-03-05 00:33:46 +00002100
jadmanski0afbb632008-06-06 21:10:57 +00002101 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002102 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002103 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00002104 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00002105
mblighd5c95802008-03-05 00:33:46 +00002106
showard21baa452008-10-21 00:08:39 +00002107 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002108 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002109
showard21baa452008-10-21 00:08:39 +00002110 if self.success:
showardfa8629c2008-11-04 16:51:23 +00002111 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00002112 self.host.update_field('dirty', 0)
2113
2114
showardd3dc1992009-04-22 21:01:40 +00002115class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002116 _num_running_parses = 0
2117
showarded2afea2009-07-07 20:54:07 +00002118 def __init__(self, queue_entries, recover_run_monitor=None):
2119 super(FinalReparseTask, self).__init__(
2120 queue_entries, pidfile_name=_PARSER_PID_FILE,
2121 logfile_name='.parse.log',
2122 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002123 # don't use _set_ids, since we don't want to set the host_ids
2124 self.queue_entry_ids = [entry.id for entry in queue_entries]
showarded2afea2009-07-07 20:54:07 +00002125 self._parse_started = self.started
showard97aed502008-11-04 02:01:24 +00002126
showard97aed502008-11-04 02:01:24 +00002127
2128 @classmethod
2129 def _increment_running_parses(cls):
2130 cls._num_running_parses += 1
2131
2132
2133 @classmethod
2134 def _decrement_running_parses(cls):
2135 cls._num_running_parses -= 1
2136
2137
2138 @classmethod
2139 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002140 return (cls._num_running_parses <
2141 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002142
2143
2144 def prolog(self):
2145 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002146 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00002147
2148
2149 def epilog(self):
2150 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002151 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002152
2153
showardd3dc1992009-04-22 21:01:40 +00002154 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002155 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002156 results_dir]
showard97aed502008-11-04 02:01:24 +00002157
2158
showard08a36412009-05-05 01:01:13 +00002159 def tick(self):
2160 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002161 # and we can, at which point we revert to default behavior
2162 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002163 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002164 else:
2165 self._try_starting_parse()
2166
2167
2168 def run(self):
2169 # override run() to not actually run unless we can
2170 self._try_starting_parse()
2171
2172
2173 def _try_starting_parse(self):
2174 if not self._can_run_new_parse():
2175 return
showard170873e2009-01-07 00:22:26 +00002176
showard97aed502008-11-04 02:01:24 +00002177 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002178 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002179
showard97aed502008-11-04 02:01:24 +00002180 self._increment_running_parses()
2181 self._parse_started = True
2182
2183
2184 def finished(self, success):
2185 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002186 if self._parse_started:
2187 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002188
2189
showardc9ae1782009-01-30 01:42:37 +00002190class SetEntryPendingTask(AgentTask):
2191 def __init__(self, queue_entry):
2192 super(SetEntryPendingTask, self).__init__(cmd='')
2193 self._queue_entry = queue_entry
2194 self._set_ids(queue_entries=[queue_entry])
2195
2196
2197 def run(self):
2198 agent = self._queue_entry.on_pending()
2199 if agent:
2200 self.agent.dispatcher.add_agent(agent)
2201 self.finished(True)
2202
2203
showarda3c58572009-03-12 20:36:59 +00002204class DBError(Exception):
2205 """Raised by the DBObject constructor when its select fails."""
2206
2207
mbligh36768f02008-02-22 18:28:33 +00002208class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002209 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002210
2211 # Subclasses MUST override these:
2212 _table_name = ''
2213 _fields = ()
2214
showarda3c58572009-03-12 20:36:59 +00002215 # A mapping from (type, id) to the instance of the object for that
2216 # particular id. This prevents us from creating new Job() and Host()
2217 # instances for every HostQueueEntry object that we instantiate as
2218 # multiple HQEs often share the same Job.
2219 _instances_by_type_and_id = weakref.WeakValueDictionary()
2220 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002221
showarda3c58572009-03-12 20:36:59 +00002222
2223 def __new__(cls, id=None, **kwargs):
2224 """
2225 Look to see if we already have an instance for this particular type
2226 and id. If so, use it instead of creating a duplicate instance.
2227 """
2228 if id is not None:
2229 instance = cls._instances_by_type_and_id.get((cls, id))
2230 if instance:
2231 return instance
2232 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2233
2234
2235 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00002236 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00002237 assert self._table_name, '_table_name must be defined in your class'
2238 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002239 if not new_record:
2240 if self._initialized and not always_query:
2241 return # We've already been initialized.
2242 if id is None:
2243 id = row[0]
2244 # Tell future constructors to use us instead of re-querying while
2245 # this instance is still around.
2246 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002247
showard6ae5ea92009-02-25 00:11:51 +00002248 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002249
jadmanski0afbb632008-06-06 21:10:57 +00002250 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002251
jadmanski0afbb632008-06-06 21:10:57 +00002252 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002253 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002254
showarda3c58572009-03-12 20:36:59 +00002255 if self._initialized:
2256 differences = self._compare_fields_in_row(row)
2257 if differences:
showard7629f142009-03-27 21:02:02 +00002258 logging.warn(
2259 'initialized %s %s instance requery is updating: %s',
2260 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002261 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002262 self._initialized = True
2263
2264
2265 @classmethod
2266 def _clear_instance_cache(cls):
2267 """Used for testing, clear the internal instance cache."""
2268 cls._instances_by_type_and_id.clear()
2269
2270
showardccbd6c52009-03-21 00:10:21 +00002271 def _fetch_row_from_db(self, row_id):
2272 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2273 rows = _db.execute(sql, (row_id,))
2274 if not rows:
showard76e29d12009-04-15 21:53:10 +00002275 raise DBError("row not found (table=%s, row id=%s)"
2276 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002277 return rows[0]
2278
2279
showarda3c58572009-03-12 20:36:59 +00002280 def _assert_row_length(self, row):
2281 assert len(row) == len(self._fields), (
2282 "table = %s, row = %s/%d, fields = %s/%d" % (
2283 self.__table, row, len(row), self._fields, len(self._fields)))
2284
2285
2286 def _compare_fields_in_row(self, row):
2287 """
2288 Given a row as returned by a SELECT query, compare it to our existing
2289 in memory fields.
2290
2291 @param row - A sequence of values corresponding to fields named in
2292 The class attribute _fields.
2293
2294 @returns A dictionary listing the differences keyed by field name
2295 containing tuples of (current_value, row_value).
2296 """
2297 self._assert_row_length(row)
2298 differences = {}
2299 for field, row_value in itertools.izip(self._fields, row):
2300 current_value = getattr(self, field)
2301 if current_value != row_value:
2302 differences[field] = (current_value, row_value)
2303 return differences
showard2bab8f42008-11-12 18:15:22 +00002304
2305
2306 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002307 """
2308 Update our field attributes using a single row returned by SELECT.
2309
2310 @param row - A sequence of values corresponding to fields named in
2311 the class fields list.
2312 """
2313 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002314
showard2bab8f42008-11-12 18:15:22 +00002315 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002316 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002317 setattr(self, field, value)
2318 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002319
showard2bab8f42008-11-12 18:15:22 +00002320 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002321
mblighe2586682008-02-29 22:45:46 +00002322
showardccbd6c52009-03-21 00:10:21 +00002323 def update_from_database(self):
2324 assert self.id is not None
2325 row = self._fetch_row_from_db(self.id)
2326 self._update_fields_from_row(row)
2327
2328
jadmanski0afbb632008-06-06 21:10:57 +00002329 def count(self, where, table = None):
2330 if not table:
2331 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002332
jadmanski0afbb632008-06-06 21:10:57 +00002333 rows = _db.execute("""
2334 SELECT count(*) FROM %s
2335 WHERE %s
2336 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002337
jadmanski0afbb632008-06-06 21:10:57 +00002338 assert len(rows) == 1
2339
2340 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002341
2342
showardd3dc1992009-04-22 21:01:40 +00002343 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002344 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002345
showard2bab8f42008-11-12 18:15:22 +00002346 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002347 return
mbligh36768f02008-02-22 18:28:33 +00002348
mblighf8c624d2008-07-03 16:58:45 +00002349 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002350 _db.execute(query, (value, self.id))
2351
showard2bab8f42008-11-12 18:15:22 +00002352 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002353
2354
jadmanski0afbb632008-06-06 21:10:57 +00002355 def save(self):
2356 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002357 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002358 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002359 values = []
2360 for key in keys:
2361 value = getattr(self, key)
2362 if value is None:
2363 values.append('NULL')
2364 else:
2365 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002366 values_str = ','.join(values)
2367 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2368 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002369 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002370 # Update our id to the one the database just assigned to us.
2371 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002372
2373
jadmanski0afbb632008-06-06 21:10:57 +00002374 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002375 self._instances_by_type_and_id.pop((type(self), id), None)
2376 self._initialized = False
2377 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002378 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2379 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002380
2381
showard63a34772008-08-18 19:32:50 +00002382 @staticmethod
2383 def _prefix_with(string, prefix):
2384 if string:
2385 string = prefix + string
2386 return string
2387
2388
jadmanski0afbb632008-06-06 21:10:57 +00002389 @classmethod
showard989f25d2008-10-01 11:38:11 +00002390 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002391 """
2392 Construct instances of our class based on the given database query.
2393
2394 @yields One class instance for each row fetched.
2395 """
showard63a34772008-08-18 19:32:50 +00002396 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2397 where = cls._prefix_with(where, 'WHERE ')
2398 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002399 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002400 'joins' : joins,
2401 'where' : where,
2402 'order_by' : order_by})
2403 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002404 for row in rows:
2405 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002406
mbligh36768f02008-02-22 18:28:33 +00002407
2408class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002409 _table_name = 'ineligible_host_queues'
2410 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002411
2412
showard89f84db2009-03-12 20:39:13 +00002413class AtomicGroup(DBObject):
2414 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002415 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2416 'invalid')
showard89f84db2009-03-12 20:39:13 +00002417
2418
showard989f25d2008-10-01 11:38:11 +00002419class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002420 _table_name = 'labels'
2421 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002422 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002423
2424
showard6157c632009-07-06 20:19:31 +00002425 def __repr__(self):
2426 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2427 self.name, self.id, self.atomic_group_id)
2428
2429
mbligh36768f02008-02-22 18:28:33 +00002430class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002431 _table_name = 'hosts'
2432 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2433 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2434
2435
jadmanski0afbb632008-06-06 21:10:57 +00002436 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002437 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002438 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002439
2440
showard170873e2009-01-07 00:22:26 +00002441 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002442 """
showard170873e2009-01-07 00:22:26 +00002443 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002444 """
2445 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002446 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002447 FROM labels
2448 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002449 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002450 ORDER BY labels.name
2451 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002452 platform = None
2453 all_labels = []
2454 for label_name, is_platform in rows:
2455 if is_platform:
2456 platform = label_name
2457 all_labels.append(label_name)
2458 return platform, all_labels
2459
2460
showard2fe3f1d2009-07-06 20:19:11 +00002461 def reverify_tasks(self):
2462 cleanup_task = CleanupTask(host=self)
2463 verify_task = VerifyTask(host=self)
2464
showard6d7b2ff2009-06-10 00:16:47 +00002465 # just to make sure this host does not get taken away
showard2fe3f1d2009-07-06 20:19:11 +00002466 self.set_status('Cleaning')
2467 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002468
2469
showard54c1ea92009-05-20 00:32:58 +00002470 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2471
2472
2473 @classmethod
2474 def cmp_for_sort(cls, a, b):
2475 """
2476 A comparison function for sorting Host objects by hostname.
2477
2478 This strips any trailing numeric digits, ignores leading 0s and
2479 compares hostnames by the leading name and the trailing digits as a
2480 number. If both hostnames do not match this pattern, they are simply
2481 compared as lower case strings.
2482
2483 Example of how hostnames will be sorted:
2484
2485 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2486
2487 This hopefully satisfy most people's hostname sorting needs regardless
2488 of their exact naming schemes. Nobody sane should have both a host10
2489 and host010 (but the algorithm works regardless).
2490 """
2491 lower_a = a.hostname.lower()
2492 lower_b = b.hostname.lower()
2493 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2494 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2495 if match_a and match_b:
2496 name_a, number_a_str = match_a.groups()
2497 name_b, number_b_str = match_b.groups()
2498 number_a = int(number_a_str.lstrip('0'))
2499 number_b = int(number_b_str.lstrip('0'))
2500 result = cmp((name_a, number_a), (name_b, number_b))
2501 if result == 0 and lower_a != lower_b:
2502 # If they compared equal above but the lower case names are
2503 # indeed different, don't report equality. abc012 != abc12.
2504 return cmp(lower_a, lower_b)
2505 return result
2506 else:
2507 return cmp(lower_a, lower_b)
2508
2509
mbligh36768f02008-02-22 18:28:33 +00002510class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002511 _table_name = 'host_queue_entries'
2512 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002513 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002514 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002515
2516
showarda3c58572009-03-12 20:36:59 +00002517 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002518 assert id or row
showarda3c58572009-03-12 20:36:59 +00002519 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002520 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002521
jadmanski0afbb632008-06-06 21:10:57 +00002522 if self.host_id:
2523 self.host = Host(self.host_id)
2524 else:
2525 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002526
showard77182562009-06-10 00:16:05 +00002527 if self.atomic_group_id:
2528 self.atomic_group = AtomicGroup(self.atomic_group_id,
2529 always_query=False)
2530 else:
2531 self.atomic_group = None
2532
showard170873e2009-01-07 00:22:26 +00002533 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002534 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002535
2536
showard89f84db2009-03-12 20:39:13 +00002537 @classmethod
2538 def clone(cls, template):
2539 """
2540 Creates a new row using the values from a template instance.
2541
2542 The new instance will not exist in the database or have a valid
2543 id attribute until its save() method is called.
2544 """
2545 assert isinstance(template, cls)
2546 new_row = [getattr(template, field) for field in cls._fields]
2547 clone = cls(row=new_row, new_record=True)
2548 clone.id = None
2549 return clone
2550
2551
showardc85c21b2008-11-24 22:17:37 +00002552 def _view_job_url(self):
2553 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2554
2555
showardf1ae3542009-05-11 19:26:02 +00002556 def get_labels(self):
2557 """
2558 Get all labels associated with this host queue entry (either via the
2559 meta_host or as a job dependency label). The labels yielded are not
2560 guaranteed to be unique.
2561
2562 @yields Label instances associated with this host_queue_entry.
2563 """
2564 if self.meta_host:
2565 yield Label(id=self.meta_host, always_query=False)
2566 labels = Label.fetch(
2567 joins="JOIN jobs_dependency_labels AS deps "
2568 "ON (labels.id = deps.label_id)",
2569 where="deps.job_id = %d" % self.job.id)
2570 for label in labels:
2571 yield label
2572
2573
jadmanski0afbb632008-06-06 21:10:57 +00002574 def set_host(self, host):
2575 if host:
2576 self.queue_log_record('Assigning host ' + host.hostname)
2577 self.update_field('host_id', host.id)
2578 self.update_field('active', True)
2579 self.block_host(host.id)
2580 else:
2581 self.queue_log_record('Releasing host')
2582 self.unblock_host(self.host.id)
2583 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002584
jadmanski0afbb632008-06-06 21:10:57 +00002585 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002586
2587
jadmanski0afbb632008-06-06 21:10:57 +00002588 def get_host(self):
2589 return self.host
mbligh36768f02008-02-22 18:28:33 +00002590
2591
jadmanski0afbb632008-06-06 21:10:57 +00002592 def queue_log_record(self, log_line):
2593 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002594 _drone_manager.write_lines_to_file(self.queue_log_path,
2595 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002596
2597
jadmanski0afbb632008-06-06 21:10:57 +00002598 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002599 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002600 row = [0, self.job.id, host_id]
2601 block = IneligibleHostQueue(row=row, new_record=True)
2602 block.save()
mblighe2586682008-02-29 22:45:46 +00002603
2604
jadmanski0afbb632008-06-06 21:10:57 +00002605 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002606 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002607 blocks = IneligibleHostQueue.fetch(
2608 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2609 for block in blocks:
2610 block.delete()
mblighe2586682008-02-29 22:45:46 +00002611
2612
showard2bab8f42008-11-12 18:15:22 +00002613 def set_execution_subdir(self, subdir=None):
2614 if subdir is None:
2615 assert self.get_host()
2616 subdir = self.get_host().hostname
2617 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002618
2619
showard6355f6b2008-12-05 18:52:13 +00002620 def _get_hostname(self):
2621 if self.host:
2622 return self.host.hostname
2623 return 'no host'
2624
2625
showard170873e2009-01-07 00:22:26 +00002626 def __str__(self):
2627 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2628
2629
jadmanski0afbb632008-06-06 21:10:57 +00002630 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002631 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002632
showardb18134f2009-03-20 20:52:18 +00002633 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002634
showardc85c21b2008-11-24 22:17:37 +00002635 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002636 self.update_field('complete', False)
2637 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002638
jadmanski0afbb632008-06-06 21:10:57 +00002639 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002640 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002641 self.update_field('complete', False)
2642 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002643
showardc85c21b2008-11-24 22:17:37 +00002644 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002645 self.update_field('complete', True)
2646 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002647
2648 should_email_status = (status.lower() in _notify_email_statuses or
2649 'all' in _notify_email_statuses)
2650 if should_email_status:
2651 self._email_on_status(status)
2652
2653 self._email_on_job_complete()
2654
2655
2656 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002657 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002658
2659 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2660 self.job.id, self.job.name, hostname, status)
2661 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2662 self.job.id, self.job.name, hostname, status,
2663 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002664 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002665
2666
2667 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002668 if not self.job.is_finished():
2669 return
showard542e8402008-09-19 20:16:18 +00002670
showardc85c21b2008-11-24 22:17:37 +00002671 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002672 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002673 for queue_entry in hosts_queue:
2674 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002675 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002676 queue_entry.status))
2677
2678 summary_text = "\n".join(summary_text)
2679 status_counts = models.Job.objects.get_status_counts(
2680 [self.job.id])[self.job.id]
2681 status = ', '.join('%d %s' % (count, status) for status, count
2682 in status_counts.iteritems())
2683
2684 subject = 'Autotest: Job ID: %s "%s" %s' % (
2685 self.job.id, self.job.name, status)
2686 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2687 self.job.id, self.job.name, status, self._view_job_url(),
2688 summary_text)
showard170873e2009-01-07 00:22:26 +00002689 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002690
2691
showard77182562009-06-10 00:16:05 +00002692 def run_pre_job_tasks(self, assigned_host=None):
2693 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002694 assert assigned_host
2695 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002696 if self.host_id is None:
2697 self.set_host(assigned_host)
2698 else:
2699 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002700
showardcfd4a7e2009-07-11 01:47:33 +00002701 logging.info("%s/%s/%s scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002702 self.job.name, self.meta_host, self.atomic_group_id,
2703 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002704
showard77182562009-06-10 00:16:05 +00002705 return self._do_run_pre_job_tasks()
2706
2707
2708 def _do_run_pre_job_tasks(self):
2709 # Every host goes thru the Verifying stage (which may or may not
2710 # actually do anything as determined by get_pre_job_tasks).
2711 self.set_status(models.HostQueueEntry.Status.VERIFYING)
2712
2713 # The pre job tasks always end with a SetEntryPendingTask which
2714 # will continue as appropriate through queue_entry.on_pending().
2715 return Agent(self.job.get_pre_job_tasks(queue_entry=self))
mblighe2586682008-02-29 22:45:46 +00002716
showard6ae5ea92009-02-25 00:11:51 +00002717
jadmanski0afbb632008-06-06 21:10:57 +00002718 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002719 assert self.host
jadmanski0afbb632008-06-06 21:10:57 +00002720 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002721 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002722 # verify/cleanup failure sets the execution subdir, so reset it here
2723 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002724 if self.meta_host:
2725 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002726
2727
jadmanski0afbb632008-06-06 21:10:57 +00002728 def handle_host_failure(self):
2729 """\
2730 Called when this queue entry's host has failed verification and
2731 repair.
2732 """
2733 assert not self.meta_host
2734 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002735 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002736
2737
jadmanskif7fa2cc2008-10-01 14:13:23 +00002738 @property
2739 def aborted_by(self):
2740 self._load_abort_info()
2741 return self._aborted_by
2742
2743
2744 @property
2745 def aborted_on(self):
2746 self._load_abort_info()
2747 return self._aborted_on
2748
2749
2750 def _load_abort_info(self):
2751 """ Fetch info about who aborted the job. """
2752 if hasattr(self, "_aborted_by"):
2753 return
2754 rows = _db.execute("""
2755 SELECT users.login, aborted_host_queue_entries.aborted_on
2756 FROM aborted_host_queue_entries
2757 INNER JOIN users
2758 ON users.id = aborted_host_queue_entries.aborted_by_id
2759 WHERE aborted_host_queue_entries.queue_entry_id = %s
2760 """, (self.id,))
2761 if rows:
2762 self._aborted_by, self._aborted_on = rows[0]
2763 else:
2764 self._aborted_by = self._aborted_on = None
2765
2766
showardb2e2c322008-10-14 17:33:55 +00002767 def on_pending(self):
2768 """
2769 Called when an entry in a synchronous job has passed verify. If the
2770 job is ready to run, returns an agent to run the job. Returns None
2771 otherwise.
2772 """
2773 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002774 self.get_host().set_status('Pending')
showard77182562009-06-10 00:16:05 +00002775 return self.job.run_if_ready(queue_entry=self)
showardb2e2c322008-10-14 17:33:55 +00002776
2777
showardd3dc1992009-04-22 21:01:40 +00002778 def abort(self, dispatcher):
2779 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002780
showardd3dc1992009-04-22 21:01:40 +00002781 Status = models.HostQueueEntry.Status
2782 has_running_job_agent = (
2783 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2784 and dispatcher.get_agents_for_entry(self))
2785 if has_running_job_agent:
2786 # do nothing; post-job tasks will finish and then mark this entry
2787 # with status "Aborted" and take care of the host
2788 return
2789
2790 if self.status in (Status.STARTING, Status.PENDING):
2791 self.host.set_status(models.Host.Status.READY)
2792 elif self.status == Status.VERIFYING:
2793 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2794
2795 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002796
2797 def execution_tag(self):
2798 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00002799 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002800
2801
showarded2afea2009-07-07 20:54:07 +00002802 def execution_path(self):
2803 return self.execution_tag()
2804
2805
mbligh36768f02008-02-22 18:28:33 +00002806class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002807 _table_name = 'jobs'
2808 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2809 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002810 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002811 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002812
showard77182562009-06-10 00:16:05 +00002813 # This does not need to be a column in the DB. The delays are likely to
2814 # be configured short. If the scheduler is stopped and restarted in
2815 # the middle of a job's delay cycle, the delay cycle will either be
2816 # repeated or skipped depending on the number of Pending machines found
2817 # when the restarted scheduler recovers to track it. Not a problem.
2818 #
2819 # A reference to the DelayedCallTask that will wake up the job should
2820 # no other HQEs change state in time. Its end_time attribute is used
2821 # by our run_with_ready_delay() method to determine if the wait is over.
2822 _delay_ready_task = None
2823
2824 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2825 # all status='Pending' atomic group HQEs incase a delay was running when the
2826 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002827
showarda3c58572009-03-12 20:36:59 +00002828 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002829 assert id or row
showarda3c58572009-03-12 20:36:59 +00002830 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002831
mblighe2586682008-02-29 22:45:46 +00002832
jadmanski0afbb632008-06-06 21:10:57 +00002833 def is_server_job(self):
2834 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002835
2836
showard170873e2009-01-07 00:22:26 +00002837 def tag(self):
2838 return "%s-%s" % (self.id, self.owner)
2839
2840
jadmanski0afbb632008-06-06 21:10:57 +00002841 def get_host_queue_entries(self):
2842 rows = _db.execute("""
2843 SELECT * FROM host_queue_entries
2844 WHERE job_id= %s
2845 """, (self.id,))
2846 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002847
jadmanski0afbb632008-06-06 21:10:57 +00002848 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002849
jadmanski0afbb632008-06-06 21:10:57 +00002850 return entries
mbligh36768f02008-02-22 18:28:33 +00002851
2852
jadmanski0afbb632008-06-06 21:10:57 +00002853 def set_status(self, status, update_queues=False):
2854 self.update_field('status',status)
2855
2856 if update_queues:
2857 for queue_entry in self.get_host_queue_entries():
2858 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002859
2860
showard77182562009-06-10 00:16:05 +00002861 def _atomic_and_has_started(self):
2862 """
2863 @returns True if any of the HostQueueEntries associated with this job
2864 have entered the Status.STARTING state or beyond.
2865 """
2866 atomic_entries = models.HostQueueEntry.objects.filter(
2867 job=self.id, atomic_group__isnull=False)
2868 if atomic_entries.count() <= 0:
2869 return False
2870
showardaf8b4ca2009-06-16 18:47:26 +00002871 # These states may *only* be reached if Job.run() has been called.
2872 started_statuses = (models.HostQueueEntry.Status.STARTING,
2873 models.HostQueueEntry.Status.RUNNING,
2874 models.HostQueueEntry.Status.COMPLETED)
2875
2876 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00002877 return started_entries.count() > 0
2878
2879
2880 def _pending_count(self):
2881 """The number of HostQueueEntries for this job in the Pending state."""
2882 pending_entries = models.HostQueueEntry.objects.filter(
2883 job=self.id, status=models.HostQueueEntry.Status.PENDING)
2884 return pending_entries.count()
2885
2886
jadmanski0afbb632008-06-06 21:10:57 +00002887 def is_ready(self):
showard77182562009-06-10 00:16:05 +00002888 # NOTE: Atomic group jobs stop reporting ready after they have been
2889 # started to avoid launching multiple copies of one atomic job.
2890 # Only possible if synch_count is less than than half the number of
2891 # machines in the atomic group.
2892 return (self._pending_count() >= self.synch_count
2893 and not self._atomic_and_has_started())
mbligh36768f02008-02-22 18:28:33 +00002894
2895
jadmanski0afbb632008-06-06 21:10:57 +00002896 def num_machines(self, clause = None):
2897 sql = "job_id=%s" % self.id
2898 if clause:
2899 sql += " AND (%s)" % clause
2900 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002901
2902
jadmanski0afbb632008-06-06 21:10:57 +00002903 def num_queued(self):
2904 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002905
2906
jadmanski0afbb632008-06-06 21:10:57 +00002907 def num_active(self):
2908 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002909
2910
jadmanski0afbb632008-06-06 21:10:57 +00002911 def num_complete(self):
2912 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002913
2914
jadmanski0afbb632008-06-06 21:10:57 +00002915 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002916 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002917
mbligh36768f02008-02-22 18:28:33 +00002918
showard6bb7c292009-01-30 01:44:51 +00002919 def _not_yet_run_entries(self, include_verifying=True):
2920 statuses = [models.HostQueueEntry.Status.QUEUED,
2921 models.HostQueueEntry.Status.PENDING]
2922 if include_verifying:
2923 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2924 return models.HostQueueEntry.objects.filter(job=self.id,
2925 status__in=statuses)
2926
2927
2928 def _stop_all_entries(self):
2929 entries_to_stop = self._not_yet_run_entries(
2930 include_verifying=False)
2931 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002932 assert not child_entry.complete, (
2933 '%s status=%s, active=%s, complete=%s' %
2934 (child_entry.id, child_entry.status, child_entry.active,
2935 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002936 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2937 child_entry.host.status = models.Host.Status.READY
2938 child_entry.host.save()
2939 child_entry.status = models.HostQueueEntry.Status.STOPPED
2940 child_entry.save()
2941
showard2bab8f42008-11-12 18:15:22 +00002942 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002943 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002944 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002945 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002946
2947
jadmanski0afbb632008-06-06 21:10:57 +00002948 def write_to_machines_file(self, queue_entry):
2949 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002950 file_path = os.path.join(self.tag(), '.machines')
2951 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002952
2953
showardf1ae3542009-05-11 19:26:02 +00002954 def _next_group_name(self, group_name=''):
2955 """@returns a directory name to use for the next host group results."""
2956 if group_name:
2957 # Sanitize for use as a pathname.
2958 group_name = group_name.replace(os.path.sep, '_')
2959 if group_name.startswith('.'):
2960 group_name = '_' + group_name[1:]
2961 # Add a separator between the group name and 'group%d'.
2962 group_name += '.'
2963 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00002964 query = models.HostQueueEntry.objects.filter(
2965 job=self.id).values('execution_subdir').distinct()
2966 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00002967 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
2968 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00002969 if ids:
2970 next_id = max(ids) + 1
2971 else:
2972 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00002973 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00002974
2975
showard170873e2009-01-07 00:22:26 +00002976 def _write_control_file(self, execution_tag):
2977 control_path = _drone_manager.attach_file_to_execution(
2978 execution_tag, self.control_file)
2979 return control_path
mbligh36768f02008-02-22 18:28:33 +00002980
showardb2e2c322008-10-14 17:33:55 +00002981
showard2bab8f42008-11-12 18:15:22 +00002982 def get_group_entries(self, queue_entry_from_group):
2983 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002984 return list(HostQueueEntry.fetch(
2985 where='job_id=%s AND execution_subdir=%s',
2986 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002987
2988
showardb2e2c322008-10-14 17:33:55 +00002989 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002990 assert queue_entries
2991 execution_tag = queue_entries[0].execution_tag()
2992 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002993 hostnames = ','.join([entry.get_host().hostname
2994 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002995
showard87ba02a2009-04-20 19:37:32 +00002996 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00002997 hostnames,
showard87ba02a2009-04-20 19:37:32 +00002998 ['-P', execution_tag, '-n',
2999 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003000 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003001
jadmanski0afbb632008-06-06 21:10:57 +00003002 if not self.is_server_job():
3003 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003004
showardb2e2c322008-10-14 17:33:55 +00003005 return params
mblighe2586682008-02-29 22:45:46 +00003006
mbligh36768f02008-02-22 18:28:33 +00003007
showardc9ae1782009-01-30 01:42:37 +00003008 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003009 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003010 return True
showard0fc38302008-10-23 00:44:07 +00003011 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00003012 return queue_entry.get_host().dirty
3013 return False
showard21baa452008-10-21 00:08:39 +00003014
showardc9ae1782009-01-30 01:42:37 +00003015
showard2fe3f1d2009-07-06 20:19:11 +00003016 def should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003017 do_not_verify = (queue_entry.host.protection ==
3018 host_protections.Protection.DO_NOT_VERIFY)
3019 if do_not_verify:
3020 return False
3021 return self.run_verify
3022
3023
showard77182562009-06-10 00:16:05 +00003024 def get_pre_job_tasks(self, queue_entry):
3025 """
3026 Get a list of tasks to perform before the host_queue_entry
3027 may be used to run this Job (such as Cleanup & Verify).
3028
3029 @returns A list of tasks to be done to the given queue_entry before
3030 it should be considered be ready to run this job. The last
3031 task in the list calls HostQueueEntry.on_pending(), which
3032 continues the flow of the job.
3033 """
showard21baa452008-10-21 00:08:39 +00003034 tasks = []
showardc9ae1782009-01-30 01:42:37 +00003035 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00003036 tasks.append(CleanupTask(queue_entry=queue_entry))
showard2fe3f1d2009-07-06 20:19:11 +00003037 if self.should_run_verify(queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003038 tasks.append(VerifyTask(queue_entry=queue_entry))
3039 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00003040 return tasks
3041
3042
showardf1ae3542009-05-11 19:26:02 +00003043 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003044 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00003045 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00003046 else:
showardf1ae3542009-05-11 19:26:02 +00003047 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003048 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003049 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003050 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003051
3052 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003053 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003054
3055
3056 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003057 """
3058 @returns A tuple containing a list of HostQueueEntry instances to be
3059 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003060 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003061 """
showard77182562009-06-10 00:16:05 +00003062 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003063 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003064 if atomic_group:
3065 num_entries_wanted = atomic_group.max_number_of_machines
3066 else:
3067 num_entries_wanted = self.synch_count
3068 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003069
showardf1ae3542009-05-11 19:26:02 +00003070 if num_entries_wanted > 0:
3071 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003072 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003073 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003074 params=(self.id, include_queue_entry.id)))
3075
3076 # Sort the chosen hosts by hostname before slicing.
3077 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3078 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3079 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3080 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003081
showardf1ae3542009-05-11 19:26:02 +00003082 # Sanity check. We'll only ever be called if this can be met.
3083 assert len(chosen_entries) >= self.synch_count
3084
3085 if atomic_group:
3086 # Look at any meta_host and dependency labels and pick the first
3087 # one that also specifies this atomic group. Use that label name
3088 # as the group name if possible (it is more specific).
3089 group_name = atomic_group.name
3090 for label in include_queue_entry.get_labels():
3091 if label.atomic_group_id:
3092 assert label.atomic_group_id == atomic_group.id
3093 group_name = label.name
3094 break
3095 else:
3096 group_name = ''
3097
3098 self._assign_new_group(chosen_entries, group_name=group_name)
3099 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00003100
3101
showard77182562009-06-10 00:16:05 +00003102 def run_if_ready(self, queue_entry):
3103 """
3104 @returns An Agent instance to ultimately run this job if enough hosts
3105 are ready for it to run.
3106 @returns None and potentially cleans up excess hosts if this Job
3107 is not ready to run.
3108 """
showardb2e2c322008-10-14 17:33:55 +00003109 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003110 self.stop_if_necessary()
3111 return None
mbligh36768f02008-02-22 18:28:33 +00003112
showard77182562009-06-10 00:16:05 +00003113 if queue_entry.atomic_group:
3114 return self.run_with_ready_delay(queue_entry)
3115
3116 return self.run(queue_entry)
3117
3118
3119 def run_with_ready_delay(self, queue_entry):
3120 """
3121 Start a delay to wait for more hosts to enter Pending state before
3122 launching an atomic group job. Once set, the a delay cannot be reset.
3123
3124 @param queue_entry: The HostQueueEntry object to get atomic group
3125 info from and pass to run_if_ready when the delay is up.
3126
3127 @returns An Agent to run the job as appropriate or None if a delay
3128 has already been set.
3129 """
3130 assert queue_entry.job_id == self.id
3131 assert queue_entry.atomic_group
3132 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3133 pending_threshold = queue_entry.atomic_group.max_number_of_machines
3134 over_max_threshold = (self._pending_count() >= pending_threshold)
3135 delay_expired = (self._delay_ready_task and
3136 time.time() >= self._delay_ready_task.end_time)
3137
3138 # Delay is disabled or we already have enough? Do not wait to run.
3139 if not delay or over_max_threshold or delay_expired:
3140 return self.run(queue_entry)
3141
3142 # A delay was previously scheduled.
3143 if self._delay_ready_task:
3144 return None
3145
3146 def run_job_after_delay():
3147 logging.info('Job %s done waiting for extra hosts.', self.id)
3148 return self.run(queue_entry)
3149
3150 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3151 callback=run_job_after_delay)
3152
3153 return Agent([self._delay_ready_task], num_processes=0)
3154
3155
3156 def run(self, queue_entry):
3157 """
3158 @param queue_entry: The HostQueueEntry instance calling this method.
3159 @returns An Agent instance to run this job or None if we've already
3160 been run.
3161 """
3162 if queue_entry.atomic_group and self._atomic_and_has_started():
3163 logging.error('Job.run() called on running atomic Job %d '
3164 'with HQE %s.', self.id, queue_entry)
3165 return None
showardf1ae3542009-05-11 19:26:02 +00003166 queue_entries, group_name = self._choose_group_to_run(queue_entry)
3167 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00003168
3169
showardf1ae3542009-05-11 19:26:02 +00003170 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00003171 for queue_entry in queue_entries:
3172 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00003173 params = self._get_autoserv_params(queue_entries)
3174 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00003175 cmd=params, group_name=group_name)
3176 tasks = [queue_task]
showard77182562009-06-10 00:16:05 +00003177 if self._delay_ready_task:
3178 # Cancel any pending callback that would try to run again
3179 # as we are already running.
3180 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003181
showard170873e2009-01-07 00:22:26 +00003182 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00003183
3184
mbligh36768f02008-02-22 18:28:33 +00003185if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003186 main()