blob: 464ad5e7b1a616aeec69e1d670875145cce43e95 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard402934a2009-12-21 22:20:47 +00008import common
showardef519212009-05-08 02:29:53 +00009import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +000010import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardf13a9e22009-12-18 22:54:09 +000011import itertools, logging, weakref, gc
showard402934a2009-12-21 22:20:47 +000012
mbligh8bcd23a2009-02-03 19:14:06 +000013import MySQLdb
showard402934a2009-12-21 22:20:47 +000014
showard043c62a2009-06-10 19:48:57 +000015from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
showard136e6dc2009-06-10 19:38:49 +000020from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000021from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000022from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000023from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000024from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000025from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000026from autotest_lib.scheduler import status_server, scheduler_config
jamesren883492a2010-02-12 00:45:18 +000027from autotest_lib.scheduler import gc_stats, metahost_scheduler
mbligh70feeee2008-06-11 16:20:49 +000028
showard549afad2009-08-20 23:33:36 +000029BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
30PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000031
mbligh36768f02008-02-22 18:28:33 +000032RESULTS_DIR = '.'
33AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000034DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000035AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
36
37if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000038 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000039AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
40AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
41
42if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000043 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000044
showardd3dc1992009-04-22 21:01:40 +000045_AUTOSERV_PID_FILE = '.autoserv_execute'
46_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
47_PARSER_PID_FILE = '.parser_execute'
mbligh4608b002010-01-05 18:22:35 +000048_ARCHIVER_PID_FILE = '.archiver_execute'
showardd3dc1992009-04-22 21:01:40 +000049
50_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
mbligh4608b002010-01-05 18:22:35 +000051 _PARSER_PID_FILE, _ARCHIVER_PID_FILE)
showardd3dc1992009-04-22 21:01:40 +000052
showard35162b02009-03-03 02:17:30 +000053# error message to leave in results dir when an autoserv process disappears
54# mysteriously
55_LOST_PROCESS_ERROR = """\
56Autoserv failed abnormally during execution for this job, probably due to a
57system error on the Autotest server. Full results may not be available. Sorry.
58"""
59
mbligh6f8bab42008-02-29 22:45:14 +000060_db = None
mbligh36768f02008-02-22 18:28:33 +000061_shutdown = False
showard170873e2009-01-07 00:22:26 +000062_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
63_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000064_testing_mode = False
showard542e8402008-09-19 20:16:18 +000065_base_url = None
showardc85c21b2008-11-24 22:17:37 +000066_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000067_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000068
69
showardec6a3b92009-09-25 20:29:13 +000070def _get_pidfile_timeout_secs():
71 """@returns How long to wait for autoserv to write pidfile."""
72 pidfile_timeout_mins = global_config.global_config.get_config_value(
73 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
74 return pidfile_timeout_mins * 60
75
76
mbligh83c1e9e2009-05-01 23:10:41 +000077def _site_init_monitor_db_dummy():
78 return {}
79
80
jamesren883492a2010-02-12 00:45:18 +000081get_metahost_schedulers = utils.import_site_function(
82 __file__, 'autotest_lib.scheduler.site_metahost_scheduler',
83 'get_metahost_schedulers', metahost_scheduler.get_metahost_schedulers)
84
85
mbligh36768f02008-02-22 18:28:33 +000086def main():
showard27f33872009-04-07 18:20:53 +000087 try:
showard549afad2009-08-20 23:33:36 +000088 try:
89 main_without_exception_handling()
90 except SystemExit:
91 raise
92 except:
93 logging.exception('Exception escaping in monitor_db')
94 raise
95 finally:
96 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000097
98
99def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000100 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000101
showard136e6dc2009-06-10 19:38:49 +0000102 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000103 parser = optparse.OptionParser(usage)
104 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
105 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000106 parser.add_option('--test', help='Indicate that scheduler is under ' +
107 'test and should use dummy autoserv and no parsing',
108 action='store_true')
109 (options, args) = parser.parse_args()
110 if len(args) != 1:
111 parser.print_usage()
112 return
mbligh36768f02008-02-22 18:28:33 +0000113
showard5613c662009-06-08 23:30:33 +0000114 scheduler_enabled = global_config.global_config.get_config_value(
115 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
116
117 if not scheduler_enabled:
118 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
119 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000120 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000121 sys.exit(1)
122
jadmanski0afbb632008-06-06 21:10:57 +0000123 global RESULTS_DIR
124 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000125
mbligh83c1e9e2009-05-01 23:10:41 +0000126 site_init = utils.import_site_function(__file__,
127 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
128 _site_init_monitor_db_dummy)
129 site_init()
130
showardcca334f2009-03-12 20:38:34 +0000131 # Change the cwd while running to avoid issues incase we were launched from
132 # somewhere odd (such as a random NFS home directory of the person running
133 # sudo to launch us as the appropriate user).
134 os.chdir(RESULTS_DIR)
135
jadmanski0afbb632008-06-06 21:10:57 +0000136 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000137 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
138 "notify_email_statuses",
139 default='')
showardc85c21b2008-11-24 22:17:37 +0000140 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000141 _notify_email_statuses = [status for status in
142 re.split(r'[\s,;:]', notify_statuses_list.lower())
143 if status]
showardc85c21b2008-11-24 22:17:37 +0000144
jadmanski0afbb632008-06-06 21:10:57 +0000145 if options.test:
146 global _autoserv_path
147 _autoserv_path = 'autoserv_dummy'
148 global _testing_mode
149 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000150
mbligh37eceaa2008-12-15 22:56:37 +0000151 # AUTOTEST_WEB.base_url is still a supported config option as some people
152 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000153 global _base_url
showard170873e2009-01-07 00:22:26 +0000154 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
155 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000156 if config_base_url:
157 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000158 else:
mbligh37eceaa2008-12-15 22:56:37 +0000159 # For the common case of everything running on a single server you
160 # can just set the hostname in a single place in the config file.
161 server_name = c.get_config_value('SERVER', 'hostname')
162 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000163 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000164 sys.exit(1)
165 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000166
showardc5afc462009-01-13 00:09:39 +0000167 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000168 server.start()
169
jadmanski0afbb632008-06-06 21:10:57 +0000170 try:
showard136e6dc2009-06-10 19:38:49 +0000171 init()
showardc5afc462009-01-13 00:09:39 +0000172 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000173 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000174
jadmanski0afbb632008-06-06 21:10:57 +0000175 while not _shutdown:
176 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000177 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000178 except:
showard170873e2009-01-07 00:22:26 +0000179 email_manager.manager.log_stacktrace(
180 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000181
showard170873e2009-01-07 00:22:26 +0000182 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000183 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000184 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000185 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000186
187
showard136e6dc2009-06-10 19:38:49 +0000188def setup_logging():
189 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
190 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
191 logging_manager.configure_logging(
192 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
193 logfile_name=log_name)
194
195
mbligh36768f02008-02-22 18:28:33 +0000196def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000197 global _shutdown
198 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000199 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000200
201
showard136e6dc2009-06-10 19:38:49 +0000202def init():
showardb18134f2009-03-20 20:52:18 +0000203 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
204 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000205
showard8de37132009-08-31 18:33:08 +0000206 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000207 logging.critical("monitor_db already running, aborting!")
208 sys.exit(1)
209 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000210
showardb1e51872008-10-07 11:08:18 +0000211 if _testing_mode:
212 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000213 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000214
jadmanski0afbb632008-06-06 21:10:57 +0000215 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
216 global _db
showard170873e2009-01-07 00:22:26 +0000217 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000218 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000219
showardfa8629c2008-11-04 16:51:23 +0000220 # ensure Django connection is in autocommit
221 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000222 # bypass the readonly connection
223 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000224
showardb18134f2009-03-20 20:52:18 +0000225 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000226 signal.signal(signal.SIGINT, handle_sigint)
227
showardd1ee1dd2009-01-07 21:33:08 +0000228 drones = global_config.global_config.get_config_value(
229 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
230 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000231 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000232 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000233 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
234
showardb18134f2009-03-20 20:52:18 +0000235 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000236
237
showarded2afea2009-07-07 20:54:07 +0000238def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
239 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000240 """
241 @returns The autoserv command line as a list of executable + parameters.
242
243 @param machines - string - A machine or comma separated list of machines
244 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000245 @param extra_args - list - Additional arguments to pass to autoserv.
246 @param job - Job object - If supplied, -u owner and -l name parameters
247 will be added.
248 @param queue_entry - A HostQueueEntry object - If supplied and no Job
249 object was supplied, this will be used to lookup the Job object.
250 """
showarda9545c02009-12-18 22:44:26 +0000251 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000252 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000253 if machines:
254 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000255 if job or queue_entry:
256 if not job:
257 job = queue_entry.job
258 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000259 if verbose:
260 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000261 return autoserv_argv + extra_args
262
263
showard89f84db2009-03-12 20:39:13 +0000264class SchedulerError(Exception):
265 """Raised by HostScheduler when an inconsistent state occurs."""
266
267
jamesren883492a2010-02-12 00:45:18 +0000268class HostScheduler(metahost_scheduler.HostSchedulingUtility):
269 """Handles the logic for choosing when to run jobs and on which hosts.
270
271 This class makes several queries to the database on each tick, building up
272 some auxiliary data structures and using them to determine which hosts are
273 eligible to run which jobs, taking into account all the various factors that
274 affect that.
275
276 In the past this was done with one or two very large, complex database
277 queries. It has proven much simpler and faster to build these auxiliary
278 data structures and perform the logic in Python.
279 """
280 def __init__(self):
281 self._metahost_schedulers = get_metahost_schedulers()
282
283
showard63a34772008-08-18 19:32:50 +0000284 def _get_ready_hosts(self):
285 # avoid any host with a currently active queue entry against it
286 hosts = Host.fetch(
showardeab66ce2009-12-23 00:03:56 +0000287 joins='LEFT JOIN afe_host_queue_entries AS active_hqe '
288 'ON (afe_hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000289 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000290 where="active_hqe.host_id IS NULL "
showardeab66ce2009-12-23 00:03:56 +0000291 "AND NOT afe_hosts.locked "
292 "AND (afe_hosts.status IS NULL "
293 "OR afe_hosts.status = 'Ready')")
showard63a34772008-08-18 19:32:50 +0000294 return dict((host.id, host) for host in hosts)
295
296
297 @staticmethod
298 def _get_sql_id_list(id_list):
299 return ','.join(str(item_id) for item_id in id_list)
300
301
302 @classmethod
showard989f25d2008-10-01 11:38:11 +0000303 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000304 if not id_list:
305 return {}
showard63a34772008-08-18 19:32:50 +0000306 query %= cls._get_sql_id_list(id_list)
307 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000308 return cls._process_many2many_dict(rows, flip)
309
310
311 @staticmethod
312 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000313 result = {}
314 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000315 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000316 if flip:
317 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000318 result.setdefault(left_id, set()).add(right_id)
319 return result
320
321
322 @classmethod
323 def _get_job_acl_groups(cls, job_ids):
324 query = """
showardeab66ce2009-12-23 00:03:56 +0000325 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
326 FROM afe_jobs
327 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
328 INNER JOIN afe_acl_groups_users ON
329 afe_acl_groups_users.user_id = afe_users.id
330 WHERE afe_jobs.id IN (%s)
showard63a34772008-08-18 19:32:50 +0000331 """
332 return cls._get_many2many_dict(query, job_ids)
333
334
335 @classmethod
336 def _get_job_ineligible_hosts(cls, job_ids):
337 query = """
338 SELECT job_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000339 FROM afe_ineligible_host_queues
showard63a34772008-08-18 19:32:50 +0000340 WHERE job_id IN (%s)
341 """
342 return cls._get_many2many_dict(query, job_ids)
343
344
345 @classmethod
showard989f25d2008-10-01 11:38:11 +0000346 def _get_job_dependencies(cls, job_ids):
347 query = """
348 SELECT job_id, label_id
showardeab66ce2009-12-23 00:03:56 +0000349 FROM afe_jobs_dependency_labels
showard989f25d2008-10-01 11:38:11 +0000350 WHERE job_id IN (%s)
351 """
352 return cls._get_many2many_dict(query, job_ids)
353
354
355 @classmethod
showard63a34772008-08-18 19:32:50 +0000356 def _get_host_acls(cls, host_ids):
357 query = """
showardd9ac4452009-02-07 02:04:37 +0000358 SELECT host_id, aclgroup_id
showardeab66ce2009-12-23 00:03:56 +0000359 FROM afe_acl_groups_hosts
showard63a34772008-08-18 19:32:50 +0000360 WHERE host_id IN (%s)
361 """
362 return cls._get_many2many_dict(query, host_ids)
363
364
365 @classmethod
366 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000367 if not host_ids:
368 return {}, {}
showard63a34772008-08-18 19:32:50 +0000369 query = """
370 SELECT label_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000371 FROM afe_hosts_labels
showard63a34772008-08-18 19:32:50 +0000372 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000373 """ % cls._get_sql_id_list(host_ids)
374 rows = _db.execute(query)
375 labels_to_hosts = cls._process_many2many_dict(rows)
376 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
377 return labels_to_hosts, hosts_to_labels
378
379
380 @classmethod
381 def _get_labels(cls):
382 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000383
384
385 def refresh(self, pending_queue_entries):
386 self._hosts_available = self._get_ready_hosts()
387
388 relevant_jobs = [queue_entry.job_id
389 for queue_entry in pending_queue_entries]
390 self._job_acls = self._get_job_acl_groups(relevant_jobs)
391 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000392 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000393
394 host_ids = self._hosts_available.keys()
395 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000396 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
397
398 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000399
400
jamesren883492a2010-02-12 00:45:18 +0000401 def hosts_in_label(self, label_id):
402 """Return potentially usable hosts with the given label."""
403 return set(self._label_hosts.get(label_id, ()))
404
405
406 def remove_host_from_label(self, host_id, label_id):
407 """Remove this host from the internal list of usable hosts in the label.
408
409 This is provided as an optimization -- when code gets a host from a
410 label and concludes it's unusable, it can call this to avoid getting
411 that host again in the future (within this tick).
412 """
413 self._label_hosts[label_id].remove(host_id)
414
415
416 def pop_host(self, host_id):
417 """Remove and return a host from the internal list of available hosts.
418 """
419 return self._hosts_available.pop(host_id)
420
421
422 def ineligible_hosts_for_entry(self, queue_entry):
423 """Get the list of hosts ineligible to run the given queue entry."""
424 return set(self._ineligible_hosts.get(queue_entry.job_id, ()))
425
426
showard63a34772008-08-18 19:32:50 +0000427 def _is_acl_accessible(self, host_id, queue_entry):
428 job_acls = self._job_acls.get(queue_entry.job_id, set())
429 host_acls = self._host_acls.get(host_id, set())
430 return len(host_acls.intersection(job_acls)) > 0
431
432
showard989f25d2008-10-01 11:38:11 +0000433 def _check_job_dependencies(self, job_dependencies, host_labels):
434 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000435 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000436
437
438 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
439 queue_entry):
showardade14e22009-01-26 22:38:32 +0000440 if not queue_entry.meta_host:
441 # bypass only_if_needed labels when a specific host is selected
442 return True
443
showard989f25d2008-10-01 11:38:11 +0000444 for label_id in host_labels:
445 label = self._labels[label_id]
446 if not label.only_if_needed:
447 # we don't care about non-only_if_needed labels
448 continue
449 if queue_entry.meta_host == label_id:
450 # if the label was requested in a metahost it's OK
451 continue
452 if label_id not in job_dependencies:
453 return False
454 return True
455
456
showard89f84db2009-03-12 20:39:13 +0000457 def _check_atomic_group_labels(self, host_labels, queue_entry):
458 """
459 Determine if the given HostQueueEntry's atomic group settings are okay
460 to schedule on a host with the given labels.
461
showard6157c632009-07-06 20:19:31 +0000462 @param host_labels: A list of label ids that the host has.
463 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000464
465 @returns True if atomic group settings are okay, False otherwise.
466 """
showard6157c632009-07-06 20:19:31 +0000467 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000468 queue_entry.atomic_group_id)
469
470
showard6157c632009-07-06 20:19:31 +0000471 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000472 """
473 Return the atomic group label id for a host with the given set of
474 labels if any, or None otherwise. Raises an exception if more than
475 one atomic group are found in the set of labels.
476
showard6157c632009-07-06 20:19:31 +0000477 @param host_labels: A list of label ids that the host has.
478 @param queue_entry: The HostQueueEntry we're testing. Only used for
479 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000480
481 @returns The id of the atomic group found on a label in host_labels
482 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000483 """
showard6157c632009-07-06 20:19:31 +0000484 atomic_labels = [self._labels[label_id] for label_id in host_labels
485 if self._labels[label_id].atomic_group_id is not None]
486 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000487 if not atomic_ids:
488 return None
489 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000490 logging.error('More than one Atomic Group on HQE "%s" via: %r',
491 queue_entry, atomic_labels)
492 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000493
494
495 def _get_atomic_group_labels(self, atomic_group_id):
496 """
497 Lookup the label ids that an atomic_group is associated with.
498
499 @param atomic_group_id - The id of the AtomicGroup to look up.
500
501 @returns A generator yeilding Label ids for this atomic group.
502 """
503 return (id for id, label in self._labels.iteritems()
504 if label.atomic_group_id == atomic_group_id
505 and not label.invalid)
506
507
showard54c1ea92009-05-20 00:32:58 +0000508 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000509 """
510 @param group_hosts - A sequence of Host ids to test for usability
511 and eligibility against the Job associated with queue_entry.
512 @param queue_entry - The HostQueueEntry that these hosts are being
513 tested for eligibility against.
514
515 @returns A subset of group_hosts Host ids that are eligible for the
516 supplied queue_entry.
517 """
518 return set(host_id for host_id in group_hosts
jamesren883492a2010-02-12 00:45:18 +0000519 if self.is_host_usable(host_id)
520 and self.is_host_eligible_for_job(host_id, queue_entry))
showard89f84db2009-03-12 20:39:13 +0000521
522
jamesren883492a2010-02-12 00:45:18 +0000523 def is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000524 if self._is_host_invalid(host_id):
525 # if an invalid host is scheduled for a job, it's a one-time host
526 # and it therefore bypasses eligibility checks. note this can only
527 # happen for non-metahosts, because invalid hosts have their label
528 # relationships cleared.
529 return True
530
showard989f25d2008-10-01 11:38:11 +0000531 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
532 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000533
showard89f84db2009-03-12 20:39:13 +0000534 return (self._is_acl_accessible(host_id, queue_entry) and
535 self._check_job_dependencies(job_dependencies, host_labels) and
536 self._check_only_if_needed_labels(
537 job_dependencies, host_labels, queue_entry) and
538 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000539
540
showard2924b0a2009-06-18 23:16:15 +0000541 def _is_host_invalid(self, host_id):
542 host_object = self._hosts_available.get(host_id, None)
543 return host_object and host_object.invalid
544
545
showard63a34772008-08-18 19:32:50 +0000546 def _schedule_non_metahost(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000547 if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000548 return None
549 return self._hosts_available.pop(queue_entry.host_id, None)
550
551
jamesren883492a2010-02-12 00:45:18 +0000552 def is_host_usable(self, host_id):
showard63a34772008-08-18 19:32:50 +0000553 if host_id not in self._hosts_available:
554 # host was already used during this scheduling cycle
555 return False
556 if self._hosts_available[host_id].invalid:
557 # Invalid hosts cannot be used for metahosts. They're included in
558 # the original query because they can be used by non-metahosts.
559 return False
560 return True
561
562
jamesren883492a2010-02-12 00:45:18 +0000563 def schedule_entry(self, queue_entry):
564 if queue_entry.host_id is not None:
showard63a34772008-08-18 19:32:50 +0000565 return self._schedule_non_metahost(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000566
567 for scheduler in self._metahost_schedulers:
568 if scheduler.can_schedule_metahost(queue_entry):
569 scheduler.schedule_metahost(queue_entry, self)
570 return None
571
572 raise SchedulerError('No metahost scheduler to handle %s' % queue_entry)
showard63a34772008-08-18 19:32:50 +0000573
574
showard89f84db2009-03-12 20:39:13 +0000575 def find_eligible_atomic_group(self, queue_entry):
576 """
577 Given an atomic group host queue entry, locate an appropriate group
578 of hosts for the associated job to run on.
579
580 The caller is responsible for creating new HQEs for the additional
581 hosts returned in order to run the actual job on them.
582
583 @returns A list of Host instances in a ready state to satisfy this
584 atomic group scheduling. Hosts will all belong to the same
585 atomic group label as specified by the queue_entry.
586 An empty list will be returned if no suitable atomic
587 group could be found.
588
589 TODO(gps): what is responsible for kicking off any attempted repairs on
590 a group of hosts? not this function, but something needs to. We do
591 not communicate that reason for returning [] outside of here...
592 For now, we'll just be unschedulable if enough hosts within one group
593 enter Repair Failed state.
594 """
595 assert queue_entry.atomic_group_id is not None
596 job = queue_entry.job
597 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000598 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000599 if job.synch_count > atomic_group.max_number_of_machines:
600 # Such a Job and HostQueueEntry should never be possible to
601 # create using the frontend. Regardless, we can't process it.
602 # Abort it immediately and log an error on the scheduler.
603 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000604 logging.error(
605 'Error: job %d synch_count=%d > requested atomic_group %d '
606 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
607 job.id, job.synch_count, atomic_group.id,
608 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000609 return []
jamesren883492a2010-02-12 00:45:18 +0000610 hosts_in_label = self.hosts_in_label(queue_entry.meta_host)
611 ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000612
613 # Look in each label associated with atomic_group until we find one with
614 # enough hosts to satisfy the job.
615 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
jamesren883492a2010-02-12 00:45:18 +0000616 group_hosts = set(self.hosts_in_label(atomic_label_id))
showard89f84db2009-03-12 20:39:13 +0000617 if queue_entry.meta_host is not None:
618 # If we have a metahost label, only allow its hosts.
619 group_hosts.intersection_update(hosts_in_label)
620 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000621 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000622 group_hosts, queue_entry)
623
624 # Job.synch_count is treated as "minimum synch count" when
625 # scheduling for an atomic group of hosts. The atomic group
626 # number of machines is the maximum to pick out of a single
627 # atomic group label for scheduling at one time.
628 min_hosts = job.synch_count
629 max_hosts = atomic_group.max_number_of_machines
630
showard54c1ea92009-05-20 00:32:58 +0000631 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000632 # Not enough eligible hosts in this atomic group label.
633 continue
634
showard54c1ea92009-05-20 00:32:58 +0000635 eligible_hosts_in_group = [self._hosts_available[id]
636 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000637 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000638 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000639
showard89f84db2009-03-12 20:39:13 +0000640 # Limit ourselves to scheduling the atomic group size.
641 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000642 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000643
644 # Remove the selected hosts from our cached internal state
645 # of available hosts in order to return the Host objects.
646 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000647 for host in eligible_hosts_in_group:
648 hosts_in_label.discard(host.id)
649 self._hosts_available.pop(host.id)
650 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000651 return host_list
652
653 return []
654
655
showard170873e2009-01-07 00:22:26 +0000656class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000657 def __init__(self):
658 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000659 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000660 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000661 user_cleanup_time = scheduler_config.config.clean_interval
662 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
663 _db, user_cleanup_time)
664 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000665 self._host_agents = {}
666 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000667 self._tick_count = 0
668 self._last_garbage_stats_time = time.time()
669 self._seconds_between_garbage_stats = 60 * (
670 global_config.global_config.get_config_value(
671 scheduler_config.CONFIG_SECTION,
672 'gc_stats_interval_mins', type=int, default=6*60))
mbligh36768f02008-02-22 18:28:33 +0000673
mbligh36768f02008-02-22 18:28:33 +0000674
showard915958d2009-04-22 21:00:58 +0000675 def initialize(self, recover_hosts=True):
676 self._periodic_cleanup.initialize()
677 self._24hr_upkeep.initialize()
678
jadmanski0afbb632008-06-06 21:10:57 +0000679 # always recover processes
680 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000681
jadmanski0afbb632008-06-06 21:10:57 +0000682 if recover_hosts:
683 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000684
685
jadmanski0afbb632008-06-06 21:10:57 +0000686 def tick(self):
showardf13a9e22009-12-18 22:54:09 +0000687 self._garbage_collection()
showard170873e2009-01-07 00:22:26 +0000688 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000689 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000690 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000691 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000692 self._schedule_delay_tasks()
showard8cc058f2009-09-08 16:26:33 +0000693 self._schedule_running_host_queue_entries()
694 self._schedule_special_tasks()
showard65db3932009-10-28 19:54:35 +0000695 self._schedule_new_jobs()
jadmanski0afbb632008-06-06 21:10:57 +0000696 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000697 _drone_manager.execute_actions()
698 email_manager.manager.send_queued_emails()
showard402934a2009-12-21 22:20:47 +0000699 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000700 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000701
showard97aed502008-11-04 02:01:24 +0000702
mblighf3294cc2009-04-08 21:17:38 +0000703 def _run_cleanup(self):
704 self._periodic_cleanup.run_cleanup_maybe()
705 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000706
mbligh36768f02008-02-22 18:28:33 +0000707
showardf13a9e22009-12-18 22:54:09 +0000708 def _garbage_collection(self):
709 threshold_time = time.time() - self._seconds_between_garbage_stats
710 if threshold_time < self._last_garbage_stats_time:
711 # Don't generate these reports very often.
712 return
713
714 self._last_garbage_stats_time = time.time()
715 # Force a full level 0 collection (because we can, it doesn't hurt
716 # at this interval).
717 gc.collect()
718 logging.info('Logging garbage collector stats on tick %d.',
719 self._tick_count)
720 gc_stats._log_garbage_collector_stats()
721
722
showard170873e2009-01-07 00:22:26 +0000723 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
724 for object_id in object_ids:
725 agent_dict.setdefault(object_id, set()).add(agent)
726
727
728 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
729 for object_id in object_ids:
730 assert object_id in agent_dict
731 agent_dict[object_id].remove(agent)
732
733
showardd1195652009-12-08 22:21:02 +0000734 def add_agent_task(self, agent_task):
735 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000736 self._agents.append(agent)
737 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000738 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
739 self._register_agent_for_ids(self._queue_entry_agents,
740 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000741
showard170873e2009-01-07 00:22:26 +0000742
743 def get_agents_for_entry(self, queue_entry):
744 """
745 Find agents corresponding to the specified queue_entry.
746 """
showardd3dc1992009-04-22 21:01:40 +0000747 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000748
749
750 def host_has_agent(self, host):
751 """
752 Determine if there is currently an Agent present using this host.
753 """
754 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000755
756
jadmanski0afbb632008-06-06 21:10:57 +0000757 def remove_agent(self, agent):
758 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000759 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
760 agent)
761 self._unregister_agent_for_ids(self._queue_entry_agents,
762 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000763
764
showard8cc058f2009-09-08 16:26:33 +0000765 def _host_has_scheduled_special_task(self, host):
766 return bool(models.SpecialTask.objects.filter(host__id=host.id,
767 is_active=False,
768 is_complete=False))
769
770
jadmanski0afbb632008-06-06 21:10:57 +0000771 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000772 agent_tasks = self._create_recovery_agent_tasks()
773 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000774 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000775 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000776 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000777 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000778 self._reverify_remaining_hosts()
779 # reinitialize drones after killing orphaned processes, since they can
780 # leave around files when they die
781 _drone_manager.execute_actions()
782 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000783
showard170873e2009-01-07 00:22:26 +0000784
showardd1195652009-12-08 22:21:02 +0000785 def _create_recovery_agent_tasks(self):
786 return (self._get_queue_entry_agent_tasks()
787 + self._get_special_task_agent_tasks(is_active=True))
788
789
790 def _get_queue_entry_agent_tasks(self):
791 # host queue entry statuses handled directly by AgentTasks (Verifying is
792 # handled through SpecialTasks, so is not listed here)
793 statuses = (models.HostQueueEntry.Status.STARTING,
794 models.HostQueueEntry.Status.RUNNING,
795 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000796 models.HostQueueEntry.Status.PARSING,
797 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000798 status_list = ','.join("'%s'" % status for status in statuses)
showard170873e2009-01-07 00:22:26 +0000799 queue_entries = HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000800 where='status IN (%s)' % status_list)
801
802 agent_tasks = []
803 used_queue_entries = set()
804 for entry in queue_entries:
805 if self.get_agents_for_entry(entry):
806 # already being handled
807 continue
808 if entry in used_queue_entries:
809 # already picked up by a synchronous job
810 continue
811 agent_task = self._get_agent_task_for_queue_entry(entry)
812 agent_tasks.append(agent_task)
813 used_queue_entries.update(agent_task.queue_entries)
814 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000815
816
showardd1195652009-12-08 22:21:02 +0000817 def _get_special_task_agent_tasks(self, is_active=False):
818 special_tasks = models.SpecialTask.objects.filter(
819 is_active=is_active, is_complete=False)
820 return [self._get_agent_task_for_special_task(task)
821 for task in special_tasks]
822
823
824 def _get_agent_task_for_queue_entry(self, queue_entry):
825 """
826 Construct an AgentTask instance for the given active HostQueueEntry,
827 if one can currently run it.
828 @param queue_entry: a HostQueueEntry
829 @returns an AgentTask to run the queue entry
830 """
831 task_entries = queue_entry.job.get_group_entries(queue_entry)
832 self._check_for_duplicate_host_entries(task_entries)
833
834 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
835 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000836 if queue_entry.is_hostless():
837 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000838 return QueueTask(queue_entries=task_entries)
839 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
840 return GatherLogsTask(queue_entries=task_entries)
841 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
842 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000843 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
844 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000845
846 raise SchedulerError('_get_agent_task_for_queue_entry got entry with '
847 'invalid status %s: %s' % (entry.status, entry))
848
849
850 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000851 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
852 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000853 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000854 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000855 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000856 if using_host:
showardd1195652009-12-08 22:21:02 +0000857 self._assert_host_has_no_agent(task_entry)
858
859
860 def _assert_host_has_no_agent(self, entry):
861 """
862 @param entry: a HostQueueEntry or a SpecialTask
863 """
864 if self.host_has_agent(entry.host):
865 agent = tuple(self._host_agents.get(entry.host.id))[0]
866 raise SchedulerError(
867 'While scheduling %s, host %s already has a host agent %s'
868 % (entry, entry.host, agent.task))
869
870
871 def _get_agent_task_for_special_task(self, special_task):
872 """
873 Construct an AgentTask class to run the given SpecialTask and add it
874 to this dispatcher.
875 @param special_task: a models.SpecialTask instance
876 @returns an AgentTask to run this SpecialTask
877 """
878 self._assert_host_has_no_agent(special_task)
879
880 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
881 for agent_task_class in special_agent_task_classes:
882 if agent_task_class.TASK_TYPE == special_task.task:
883 return agent_task_class(task=special_task)
884
885 raise SchedulerError('No AgentTask class for task', str(special_task))
886
887
888 def _register_pidfiles(self, agent_tasks):
889 for agent_task in agent_tasks:
890 agent_task.register_necessary_pidfiles()
891
892
893 def _recover_tasks(self, agent_tasks):
894 orphans = _drone_manager.get_orphaned_autoserv_processes()
895
896 for agent_task in agent_tasks:
897 agent_task.recover()
898 if agent_task.monitor and agent_task.monitor.has_process():
899 orphans.discard(agent_task.monitor.get_process())
900 self.add_agent_task(agent_task)
901
902 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000903
904
showard8cc058f2009-09-08 16:26:33 +0000905 def _get_unassigned_entries(self, status):
906 for entry in HostQueueEntry.fetch(where="status = '%s'" % status):
showard0db3d432009-10-12 20:29:15 +0000907 if entry.status == status and not self.get_agents_for_entry(entry):
908 # The status can change during iteration, e.g., if job.run()
909 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000910 yield entry
911
912
showard6878e8b2009-07-20 22:37:45 +0000913 def _check_for_remaining_orphan_processes(self, orphans):
914 if not orphans:
915 return
916 subject = 'Unrecovered orphan autoserv processes remain'
917 message = '\n'.join(str(process) for process in orphans)
918 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000919
920 die_on_orphans = global_config.global_config.get_config_value(
921 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
922
923 if die_on_orphans:
924 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000925
showard170873e2009-01-07 00:22:26 +0000926
showard8cc058f2009-09-08 16:26:33 +0000927 def _recover_pending_entries(self):
928 for entry in self._get_unassigned_entries(
929 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000930 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000931 entry.on_pending()
932
933
showardb8900452009-10-12 20:31:01 +0000934 def _check_for_unrecovered_verifying_entries(self):
showard170873e2009-01-07 00:22:26 +0000935 queue_entries = HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000936 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
937 unrecovered_hqes = []
938 for queue_entry in queue_entries:
939 special_tasks = models.SpecialTask.objects.filter(
940 task__in=(models.SpecialTask.Task.CLEANUP,
941 models.SpecialTask.Task.VERIFY),
942 queue_entry__id=queue_entry.id,
943 is_complete=False)
944 if special_tasks.count() == 0:
945 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000946
showardb8900452009-10-12 20:31:01 +0000947 if unrecovered_hqes:
948 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000949 raise SchedulerError(
showard37757f32009-10-19 18:34:24 +0000950 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000951 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000952
953
showard65db3932009-10-28 19:54:35 +0000954 def _get_prioritized_special_tasks(self):
955 """
956 Returns all queued SpecialTasks prioritized for repair first, then
957 cleanup, then verify.
958 """
959 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
960 is_complete=False,
961 host__locked=False)
962 # exclude hosts with active queue entries unless the SpecialTask is for
963 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000964 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000965 queued_tasks, 'afe_host_queue_entries', 'host_id',
966 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000967 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000968 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000969 where=['(afe_host_queue_entries.id IS NULL OR '
970 'afe_host_queue_entries.id = '
971 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000972
showard65db3932009-10-28 19:54:35 +0000973 # reorder tasks by priority
974 task_priority_order = [models.SpecialTask.Task.REPAIR,
975 models.SpecialTask.Task.CLEANUP,
976 models.SpecialTask.Task.VERIFY]
977 def task_priority_key(task):
978 return task_priority_order.index(task.task)
979 return sorted(queued_tasks, key=task_priority_key)
980
981
showard65db3932009-10-28 19:54:35 +0000982 def _schedule_special_tasks(self):
983 """
984 Execute queued SpecialTasks that are ready to run on idle hosts.
985 """
986 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000987 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000988 continue
showardd1195652009-12-08 22:21:02 +0000989 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000990
991
showard170873e2009-01-07 00:22:26 +0000992 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000993 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000994 # should never happen
showarded2afea2009-07-07 20:54:07 +0000995 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000996 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000997 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000998 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000999 print_message=message)
mblighbb421852008-03-11 22:36:16 +00001000
1001
jadmanski0afbb632008-06-06 21:10:57 +00001002 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +00001003 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +00001004 full_where='locked = 0 AND invalid = 0 AND ' + where
1005 for host in Host.fetch(where=full_where):
1006 if self.host_has_agent(host):
1007 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +00001008 continue
showard8cc058f2009-09-08 16:26:33 +00001009 if self._host_has_scheduled_special_task(host):
1010 # host will have a special task scheduled on the next cycle
1011 continue
showard170873e2009-01-07 00:22:26 +00001012 if print_message:
showardb18134f2009-03-20 20:52:18 +00001013 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +00001014 models.SpecialTask.objects.create(
1015 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +00001016 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +00001017
1018
jadmanski0afbb632008-06-06 21:10:57 +00001019 def _recover_hosts(self):
1020 # recover "Repair Failed" hosts
1021 message = 'Reverifying dead host %s'
1022 self._reverify_hosts_where("status = 'Repair Failed'",
1023 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +00001024
1025
showard04c82c52008-05-29 19:38:12 +00001026
showardb95b1bd2008-08-15 18:11:04 +00001027 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +00001028 # prioritize by job priority, then non-metahost over metahost, then FIFO
1029 return list(HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +00001030 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +00001031 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +00001032 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +00001033
1034
showard89f84db2009-03-12 20:39:13 +00001035 def _refresh_pending_queue_entries(self):
1036 """
1037 Lookup the pending HostQueueEntries and call our HostScheduler
1038 refresh() method given that list. Return the list.
1039
1040 @returns A list of pending HostQueueEntries sorted in priority order.
1041 """
showard63a34772008-08-18 19:32:50 +00001042 queue_entries = self._get_pending_queue_entries()
1043 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +00001044 return []
showardb95b1bd2008-08-15 18:11:04 +00001045
showard63a34772008-08-18 19:32:50 +00001046 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +00001047
showard89f84db2009-03-12 20:39:13 +00001048 return queue_entries
1049
1050
1051 def _schedule_atomic_group(self, queue_entry):
1052 """
1053 Schedule the given queue_entry on an atomic group of hosts.
1054
1055 Returns immediately if there are insufficient available hosts.
1056
1057 Creates new HostQueueEntries based off of queue_entry for the
1058 scheduled hosts and starts them all running.
1059 """
1060 # This is a virtual host queue entry representing an entire
1061 # atomic group, find a group and schedule their hosts.
1062 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1063 queue_entry)
1064 if not group_hosts:
1065 return
showardcbe6f942009-06-17 19:33:49 +00001066
1067 logging.info('Expanding atomic group entry %s with hosts %s',
1068 queue_entry,
1069 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +00001070
showard89f84db2009-03-12 20:39:13 +00001071 for assigned_host in group_hosts[1:]:
1072 # Create a new HQE for every additional assigned_host.
1073 new_hqe = HostQueueEntry.clone(queue_entry)
1074 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +00001075 new_hqe.set_host(assigned_host)
1076 self._run_queue_entry(new_hqe)
1077
1078 # The first assigned host uses the original HostQueueEntry
1079 queue_entry.set_host(group_hosts[0])
1080 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001081
1082
showarda9545c02009-12-18 22:44:26 +00001083 def _schedule_hostless_job(self, queue_entry):
1084 self.add_agent_task(HostlessQueueTask(queue_entry))
1085
1086
showard89f84db2009-03-12 20:39:13 +00001087 def _schedule_new_jobs(self):
1088 queue_entries = self._refresh_pending_queue_entries()
1089 if not queue_entries:
1090 return
1091
showard63a34772008-08-18 19:32:50 +00001092 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001093 is_unassigned_atomic_group = (
1094 queue_entry.atomic_group_id is not None
1095 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +00001096
1097 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +00001098 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +00001099 elif is_unassigned_atomic_group:
1100 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +00001101 else:
jamesren883492a2010-02-12 00:45:18 +00001102 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +00001103 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +00001104 assert assigned_host.id == queue_entry.host_id
1105 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001106
1107
showard8cc058f2009-09-08 16:26:33 +00001108 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +00001109 for agent_task in self._get_queue_entry_agent_tasks():
1110 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +00001111
1112
1113 def _schedule_delay_tasks(self):
showardd2014822009-10-12 20:26:58 +00001114 for entry in HostQueueEntry.fetch(where='status = "%s"' %
1115 models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001116 task = entry.job.schedule_delayed_callback_task(entry)
1117 if task:
showardd1195652009-12-08 22:21:02 +00001118 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +00001119
1120
jamesren883492a2010-02-12 00:45:18 +00001121 def _run_queue_entry(self, queue_entry):
1122 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +00001123
1124
jadmanski0afbb632008-06-06 21:10:57 +00001125 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001126 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001127 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001128 for agent in self.get_agents_for_entry(entry):
1129 agent.abort()
1130 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001131
1132
showard324bf812009-01-20 23:23:38 +00001133 def _can_start_agent(self, agent, num_started_this_cycle,
1134 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001135 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +00001136 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +00001137 return True
1138 # don't allow any nonzero-process agents to run after we've reached a
1139 # limit (this avoids starvation of many-process agents)
1140 if have_reached_limit:
1141 return False
1142 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001143 max_runnable_processes = _drone_manager.max_runnable_processes(
showardd1195652009-12-08 22:21:02 +00001144 agent.task.owner_username)
1145 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001146 return False
1147 # if a single agent exceeds the per-cycle throttling, still allow it to
1148 # run when it's the first agent in the cycle
1149 if num_started_this_cycle == 0:
1150 return True
1151 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +00001152 if (num_started_this_cycle + agent.task.num_processes >
1153 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001154 return False
1155 return True
1156
1157
jadmanski0afbb632008-06-06 21:10:57 +00001158 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001159 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001160 have_reached_limit = False
1161 # iterate over copy, so we can remove agents during iteration
1162 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001163 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001164 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001165 have_reached_limit):
1166 have_reached_limit = True
1167 continue
showardd1195652009-12-08 22:21:02 +00001168 num_started_this_cycle += agent.task.num_processes
showard4c5374f2008-09-04 17:02:56 +00001169 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001170 if agent.is_done():
1171 logging.info("agent finished")
1172 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001173 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001174 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001175
1176
showard29f7cd22009-04-29 21:16:24 +00001177 def _process_recurring_runs(self):
1178 recurring_runs = models.RecurringRun.objects.filter(
1179 start_date__lte=datetime.datetime.now())
1180 for rrun in recurring_runs:
1181 # Create job from template
1182 job = rrun.job
1183 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001184 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001185
1186 host_objects = info['hosts']
1187 one_time_hosts = info['one_time_hosts']
1188 metahost_objects = info['meta_hosts']
1189 dependencies = info['dependencies']
1190 atomic_group = info['atomic_group']
1191
1192 for host in one_time_hosts or []:
1193 this_host = models.Host.create_one_time_host(host.hostname)
1194 host_objects.append(this_host)
1195
1196 try:
1197 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001198 options=options,
showard29f7cd22009-04-29 21:16:24 +00001199 host_objects=host_objects,
1200 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001201 atomic_group=atomic_group)
1202
1203 except Exception, ex:
1204 logging.exception(ex)
1205 #TODO send email
1206
1207 if rrun.loop_count == 1:
1208 rrun.delete()
1209 else:
1210 if rrun.loop_count != 0: # if not infinite loop
1211 # calculate new start_date
1212 difference = datetime.timedelta(seconds=rrun.loop_period)
1213 rrun.start_date = rrun.start_date + difference
1214 rrun.loop_count -= 1
1215 rrun.save()
1216
1217
showard170873e2009-01-07 00:22:26 +00001218class PidfileRunMonitor(object):
1219 """
1220 Client must call either run() to start a new process or
1221 attach_to_existing_process().
1222 """
mbligh36768f02008-02-22 18:28:33 +00001223
showard170873e2009-01-07 00:22:26 +00001224 class _PidfileException(Exception):
1225 """
1226 Raised when there's some unexpected behavior with the pid file, but only
1227 used internally (never allowed to escape this class).
1228 """
mbligh36768f02008-02-22 18:28:33 +00001229
1230
showard170873e2009-01-07 00:22:26 +00001231 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001232 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001233 self._start_time = None
1234 self.pidfile_id = None
1235 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001236
1237
showard170873e2009-01-07 00:22:26 +00001238 def _add_nice_command(self, command, nice_level):
1239 if not nice_level:
1240 return command
1241 return ['nice', '-n', str(nice_level)] + command
1242
1243
1244 def _set_start_time(self):
1245 self._start_time = time.time()
1246
1247
showard418785b2009-11-23 20:19:59 +00001248 def run(self, command, working_directory, num_processes, nice_level=None,
1249 log_file=None, pidfile_name=None, paired_with_pidfile=None,
1250 username=None):
showard170873e2009-01-07 00:22:26 +00001251 assert command is not None
1252 if nice_level is not None:
1253 command = ['nice', '-n', str(nice_level)] + command
1254 self._set_start_time()
1255 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001256 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001257 num_processes=num_processes, log_file=log_file,
1258 paired_with_pidfile=paired_with_pidfile, username=username)
showard170873e2009-01-07 00:22:26 +00001259
1260
showarded2afea2009-07-07 20:54:07 +00001261 def attach_to_existing_process(self, execution_path,
showardd1195652009-12-08 22:21:02 +00001262 pidfile_name=_AUTOSERV_PID_FILE,
1263 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001264 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001265 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001266 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001267 if num_processes is not None:
1268 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001269
1270
jadmanski0afbb632008-06-06 21:10:57 +00001271 def kill(self):
showard170873e2009-01-07 00:22:26 +00001272 if self.has_process():
1273 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001274
mbligh36768f02008-02-22 18:28:33 +00001275
showard170873e2009-01-07 00:22:26 +00001276 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001277 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001278 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001279
1280
showard170873e2009-01-07 00:22:26 +00001281 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001282 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001283 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001284 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001285
1286
showard170873e2009-01-07 00:22:26 +00001287 def _read_pidfile(self, use_second_read=False):
1288 assert self.pidfile_id is not None, (
1289 'You must call run() or attach_to_existing_process()')
1290 contents = _drone_manager.get_pidfile_contents(
1291 self.pidfile_id, use_second_read=use_second_read)
1292 if contents.is_invalid():
1293 self._state = drone_manager.PidfileContents()
1294 raise self._PidfileException(contents)
1295 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001296
1297
showard21baa452008-10-21 00:08:39 +00001298 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001299 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1300 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001301 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001302 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001303
1304
1305 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001306 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001307 return
mblighbb421852008-03-11 22:36:16 +00001308
showard21baa452008-10-21 00:08:39 +00001309 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001310
showard170873e2009-01-07 00:22:26 +00001311 if self._state.process is None:
1312 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001313 return
mbligh90a549d2008-03-25 23:52:34 +00001314
showard21baa452008-10-21 00:08:39 +00001315 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001316 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001317 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001318 return
mbligh90a549d2008-03-25 23:52:34 +00001319
showard170873e2009-01-07 00:22:26 +00001320 # pid but no running process - maybe process *just* exited
1321 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001322 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001323 # autoserv exited without writing an exit code
1324 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001325 self._handle_pidfile_error(
1326 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001327
showard21baa452008-10-21 00:08:39 +00001328
1329 def _get_pidfile_info(self):
1330 """\
1331 After completion, self._state will contain:
1332 pid=None, exit_status=None if autoserv has not yet run
1333 pid!=None, exit_status=None if autoserv is running
1334 pid!=None, exit_status!=None if autoserv has completed
1335 """
1336 try:
1337 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001338 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001339 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001340
1341
showard170873e2009-01-07 00:22:26 +00001342 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001343 """\
1344 Called when no pidfile is found or no pid is in the pidfile.
1345 """
showard170873e2009-01-07 00:22:26 +00001346 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001347 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001348 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001349 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001350 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001351
1352
showard35162b02009-03-03 02:17:30 +00001353 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001354 """\
1355 Called when autoserv has exited without writing an exit status,
1356 or we've timed out waiting for autoserv to write a pid to the
1357 pidfile. In either case, we just return failure and the caller
1358 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001359
showard170873e2009-01-07 00:22:26 +00001360 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001361 """
1362 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001363 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001364 self._state.exit_status = 1
1365 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001366
1367
jadmanski0afbb632008-06-06 21:10:57 +00001368 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001369 self._get_pidfile_info()
1370 return self._state.exit_status
1371
1372
1373 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001374 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001375 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001376 if self._state.num_tests_failed is None:
1377 return -1
showard21baa452008-10-21 00:08:39 +00001378 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001379
1380
showardcdaeae82009-08-31 18:32:48 +00001381 def try_copy_results_on_drone(self, **kwargs):
1382 if self.has_process():
1383 # copy results logs into the normal place for job results
1384 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1385
1386
1387 def try_copy_to_results_repository(self, source, **kwargs):
1388 if self.has_process():
1389 _drone_manager.copy_to_results_repository(self.get_process(),
1390 source, **kwargs)
1391
1392
mbligh36768f02008-02-22 18:28:33 +00001393class Agent(object):
showard77182562009-06-10 00:16:05 +00001394 """
showard8cc058f2009-09-08 16:26:33 +00001395 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001396
1397 The following methods are required on all task objects:
1398 poll() - Called periodically to let the task check its status and
1399 update its internal state. If the task succeeded.
1400 is_done() - Returns True if the task is finished.
1401 abort() - Called when an abort has been requested. The task must
1402 set its aborted attribute to True if it actually aborted.
1403
1404 The following attributes are required on all task objects:
1405 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001406 success - bool, True if this task succeeded.
1407 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1408 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001409 """
1410
1411
showard418785b2009-11-23 20:19:59 +00001412 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001413 """
showard8cc058f2009-09-08 16:26:33 +00001414 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001415 """
showard8cc058f2009-09-08 16:26:33 +00001416 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001417
showard77182562009-06-10 00:16:05 +00001418 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001419 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001420
showard8cc058f2009-09-08 16:26:33 +00001421 self.queue_entry_ids = task.queue_entry_ids
1422 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001423
showard8cc058f2009-09-08 16:26:33 +00001424 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001425 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001426
1427
jadmanski0afbb632008-06-06 21:10:57 +00001428 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001429 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001430 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001431 self.task.poll()
1432 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001433 self.finished = True
showardec113162008-05-08 00:52:49 +00001434
1435
jadmanski0afbb632008-06-06 21:10:57 +00001436 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001437 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001438
1439
showardd3dc1992009-04-22 21:01:40 +00001440 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001441 if self.task:
1442 self.task.abort()
1443 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001444 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001445 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001446
showardd3dc1992009-04-22 21:01:40 +00001447
showard77182562009-06-10 00:16:05 +00001448class DelayedCallTask(object):
1449 """
1450 A task object like AgentTask for an Agent to run that waits for the
1451 specified amount of time to have elapsed before calling the supplied
1452 callback once and finishing. If the callback returns anything, it is
1453 assumed to be a new Agent instance and will be added to the dispatcher.
1454
1455 @attribute end_time: The absolute posix time after which this task will
1456 call its callback when it is polled and be finished.
1457
1458 Also has all attributes required by the Agent class.
1459 """
1460 def __init__(self, delay_seconds, callback, now_func=None):
1461 """
1462 @param delay_seconds: The delay in seconds from now that this task
1463 will call the supplied callback and be done.
1464 @param callback: A callable to be called by this task once after at
1465 least delay_seconds time has elapsed. It must return None
1466 or a new Agent instance.
1467 @param now_func: A time.time like function. Default: time.time.
1468 Used for testing.
1469 """
1470 assert delay_seconds > 0
1471 assert callable(callback)
1472 if not now_func:
1473 now_func = time.time
1474 self._now_func = now_func
1475 self._callback = callback
1476
1477 self.end_time = self._now_func() + delay_seconds
1478
1479 # These attributes are required by Agent.
1480 self.aborted = False
showard77182562009-06-10 00:16:05 +00001481 self.host_ids = ()
1482 self.success = False
1483 self.queue_entry_ids = ()
showard418785b2009-11-23 20:19:59 +00001484 self.num_processes = 0
showard77182562009-06-10 00:16:05 +00001485
1486
1487 def poll(self):
showard8cc058f2009-09-08 16:26:33 +00001488 if not self.is_done() and self._now_func() >= self.end_time:
1489 self._callback()
showard77182562009-06-10 00:16:05 +00001490 self.success = True
1491
1492
1493 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001494 return self.success or self.aborted
showard77182562009-06-10 00:16:05 +00001495
1496
1497 def abort(self):
1498 self.aborted = True
showard77182562009-06-10 00:16:05 +00001499
1500
mbligh36768f02008-02-22 18:28:33 +00001501class AgentTask(object):
showardd1195652009-12-08 22:21:02 +00001502 class _NullMonitor(object):
1503 pidfile_id = None
1504
1505 def has_process(self):
1506 return True
1507
1508
1509 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001510 """
showardd1195652009-12-08 22:21:02 +00001511 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001512 """
jadmanski0afbb632008-06-06 21:10:57 +00001513 self.done = False
showardd1195652009-12-08 22:21:02 +00001514 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001515 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001516 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001517 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001518 self.queue_entry_ids = []
1519 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001520 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001521
1522
1523 def _set_ids(self, host=None, queue_entries=None):
1524 if queue_entries and queue_entries != [None]:
1525 self.host_ids = [entry.host.id for entry in queue_entries]
1526 self.queue_entry_ids = [entry.id for entry in queue_entries]
1527 else:
1528 assert host
1529 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001530
1531
jadmanski0afbb632008-06-06 21:10:57 +00001532 def poll(self):
showard08a36412009-05-05 01:01:13 +00001533 if not self.started:
1534 self.start()
showardd1195652009-12-08 22:21:02 +00001535 if not self.done:
1536 self.tick()
showard08a36412009-05-05 01:01:13 +00001537
1538
1539 def tick(self):
showardd1195652009-12-08 22:21:02 +00001540 assert self.monitor
1541 exit_code = self.monitor.exit_code()
1542 if exit_code is None:
1543 return
mbligh36768f02008-02-22 18:28:33 +00001544
showardd1195652009-12-08 22:21:02 +00001545 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001546 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001547
1548
jadmanski0afbb632008-06-06 21:10:57 +00001549 def is_done(self):
1550 return self.done
mbligh36768f02008-02-22 18:28:33 +00001551
1552
jadmanski0afbb632008-06-06 21:10:57 +00001553 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001554 if self.done:
showardd1195652009-12-08 22:21:02 +00001555 assert self.started
showard08a36412009-05-05 01:01:13 +00001556 return
showardd1195652009-12-08 22:21:02 +00001557 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001558 self.done = True
1559 self.success = success
1560 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001561
1562
jadmanski0afbb632008-06-06 21:10:57 +00001563 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001564 """
1565 To be overridden.
1566 """
showarded2afea2009-07-07 20:54:07 +00001567 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001568 self.register_necessary_pidfiles()
1569
1570
1571 def _log_file(self):
1572 if not self._log_file_name:
1573 return None
1574 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001575
mbligh36768f02008-02-22 18:28:33 +00001576
jadmanski0afbb632008-06-06 21:10:57 +00001577 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001578 log_file = self._log_file()
1579 if self.monitor and log_file:
1580 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001581
1582
jadmanski0afbb632008-06-06 21:10:57 +00001583 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001584 """
1585 To be overridden.
1586 """
jadmanski0afbb632008-06-06 21:10:57 +00001587 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001588 logging.info("%s finished with success=%s", type(self).__name__,
1589 self.success)
1590
mbligh36768f02008-02-22 18:28:33 +00001591
1592
jadmanski0afbb632008-06-06 21:10:57 +00001593 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001594 if not self.started:
1595 self.prolog()
1596 self.run()
1597
1598 self.started = True
1599
1600
1601 def abort(self):
1602 if self.monitor:
1603 self.monitor.kill()
1604 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001605 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001606 self.cleanup()
1607
1608
showarded2afea2009-07-07 20:54:07 +00001609 def _get_consistent_execution_path(self, execution_entries):
1610 first_execution_path = execution_entries[0].execution_path()
1611 for execution_entry in execution_entries[1:]:
1612 assert execution_entry.execution_path() == first_execution_path, (
1613 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1614 execution_entry,
1615 first_execution_path,
1616 execution_entries[0]))
1617 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001618
1619
showarded2afea2009-07-07 20:54:07 +00001620 def _copy_results(self, execution_entries, use_monitor=None):
1621 """
1622 @param execution_entries: list of objects with execution_path() method
1623 """
showard6d1c1432009-08-20 23:30:39 +00001624 if use_monitor is not None and not use_monitor.has_process():
1625 return
1626
showarded2afea2009-07-07 20:54:07 +00001627 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001628 if use_monitor is None:
1629 assert self.monitor
1630 use_monitor = self.monitor
1631 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001632 execution_path = self._get_consistent_execution_path(execution_entries)
1633 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001634 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001635
showarda1e74b32009-05-12 17:32:04 +00001636
1637 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001638 for queue_entry in queue_entries:
1639 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001640
1641
mbligh4608b002010-01-05 18:22:35 +00001642 def _archive_results(self, queue_entries):
1643 for queue_entry in queue_entries:
1644 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001645
1646
showardd1195652009-12-08 22:21:02 +00001647 def _command_line(self):
1648 """
1649 Return the command line to run. Must be overridden.
1650 """
1651 raise NotImplementedError
1652
1653
1654 @property
1655 def num_processes(self):
1656 """
1657 Return the number of processes forked by this AgentTask's process. It
1658 may only be approximate. To be overridden if necessary.
1659 """
1660 return 1
1661
1662
1663 def _paired_with_monitor(self):
1664 """
1665 If this AgentTask's process must run on the same machine as some
1666 previous process, this method should be overridden to return a
1667 PidfileRunMonitor for that process.
1668 """
1669 return self._NullMonitor()
1670
1671
1672 @property
1673 def owner_username(self):
1674 """
1675 Return login of user responsible for this task. May be None. Must be
1676 overridden.
1677 """
1678 raise NotImplementedError
1679
1680
1681 def _working_directory(self):
1682 """
1683 Return the directory where this AgentTask's process executes. Must be
1684 overridden.
1685 """
1686 raise NotImplementedError
1687
1688
1689 def _pidfile_name(self):
1690 """
1691 Return the name of the pidfile this AgentTask's process uses. To be
1692 overridden if necessary.
1693 """
1694 return _AUTOSERV_PID_FILE
1695
1696
1697 def _check_paired_results_exist(self):
1698 if not self._paired_with_monitor().has_process():
1699 email_manager.manager.enqueue_notify_email(
1700 'No paired results in task',
1701 'No paired results in task %s at %s'
1702 % (self, self._paired_with_monitor().pidfile_id))
1703 self.finished(False)
1704 return False
1705 return True
1706
1707
1708 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001709 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001710 self.monitor = PidfileRunMonitor()
1711
1712
1713 def run(self):
1714 if not self._check_paired_results_exist():
1715 return
1716
1717 self._create_monitor()
1718 self.monitor.run(
1719 self._command_line(), self._working_directory(),
1720 num_processes=self.num_processes,
1721 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1722 pidfile_name=self._pidfile_name(),
1723 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
1724 username=self.owner_username)
1725
1726
1727 def register_necessary_pidfiles(self):
1728 pidfile_id = _drone_manager.get_pidfile_id_from(
1729 self._working_directory(), self._pidfile_name())
1730 _drone_manager.register_pidfile(pidfile_id)
1731
1732 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1733 if paired_pidfile_id:
1734 _drone_manager.register_pidfile(paired_pidfile_id)
1735
1736
1737 def recover(self):
1738 if not self._check_paired_results_exist():
1739 return
1740
1741 self._create_monitor()
1742 self.monitor.attach_to_existing_process(
1743 self._working_directory(), pidfile_name=self._pidfile_name(),
1744 num_processes=self.num_processes)
1745 if not self.monitor.has_process():
1746 # no process to recover; wait to be started normally
1747 self.monitor = None
1748 return
1749
1750 self.started = True
1751 logging.info('Recovering process %s for %s at %s'
1752 % (self.monitor.get_process(), type(self).__name__,
1753 self._working_directory()))
mbligh36768f02008-02-22 18:28:33 +00001754
1755
mbligh4608b002010-01-05 18:22:35 +00001756 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1757 allowed_host_statuses=None):
1758 for entry in queue_entries:
1759 if entry.status not in allowed_hqe_statuses:
1760 raise SchedulerError('Queue task attempting to start '
1761 'entry with invalid status %s: %s'
1762 % (entry.status, entry))
1763 invalid_host_status = (
1764 allowed_host_statuses is not None
1765 and entry.host.status not in allowed_host_statuses)
1766 if invalid_host_status:
1767 raise SchedulerError('Queue task attempting to start on queue '
1768 'entry with invalid host status %s: %s'
1769 % (entry.host.status, entry))
1770
1771
showardd9205182009-04-27 20:09:55 +00001772class TaskWithJobKeyvals(object):
1773 """AgentTask mixin providing functionality to help with job keyval files."""
1774 _KEYVAL_FILE = 'keyval'
1775 def _format_keyval(self, key, value):
1776 return '%s=%s' % (key, value)
1777
1778
1779 def _keyval_path(self):
1780 """Subclasses must override this"""
1781 raise NotImplemented
1782
1783
1784 def _write_keyval_after_job(self, field, value):
1785 assert self.monitor
1786 if not self.monitor.has_process():
1787 return
1788 _drone_manager.write_lines_to_file(
1789 self._keyval_path(), [self._format_keyval(field, value)],
1790 paired_with_process=self.monitor.get_process())
1791
1792
1793 def _job_queued_keyval(self, job):
1794 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1795
1796
1797 def _write_job_finished(self):
1798 self._write_keyval_after_job("job_finished", int(time.time()))
1799
1800
showarddb502762009-09-09 15:31:20 +00001801 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1802 keyval_contents = '\n'.join(self._format_keyval(key, value)
1803 for key, value in keyval_dict.iteritems())
1804 # always end with a newline to allow additional keyvals to be written
1805 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001806 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001807 keyval_contents,
1808 file_path=keyval_path)
1809
1810
1811 def _write_keyvals_before_job(self, keyval_dict):
1812 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1813
1814
1815 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001816 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001817 host.hostname)
1818 platform, all_labels = host.platform_and_labels()
1819 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1820 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1821
1822
showard8cc058f2009-09-08 16:26:33 +00001823class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001824 """
1825 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1826 """
1827
1828 TASK_TYPE = None
1829 host = None
1830 queue_entry = None
1831
showardd1195652009-12-08 22:21:02 +00001832 def __init__(self, task, extra_command_args):
1833 super(SpecialAgentTask, self).__init__()
1834
showarded2afea2009-07-07 20:54:07 +00001835 assert (self.TASK_TYPE is not None,
1836 'self.TASK_TYPE must be overridden')
showard8cc058f2009-09-08 16:26:33 +00001837
1838 self.host = Host(id=task.host.id)
1839 self.queue_entry = None
1840 if task.queue_entry:
1841 self.queue_entry = HostQueueEntry(id=task.queue_entry.id)
1842
showarded2afea2009-07-07 20:54:07 +00001843 self.task = task
1844 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001845
1846
showard8cc058f2009-09-08 16:26:33 +00001847 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001848 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1849
1850
1851 def _command_line(self):
1852 return _autoserv_command_line(self.host.hostname,
1853 self._extra_command_args,
1854 queue_entry=self.queue_entry)
1855
1856
1857 def _working_directory(self):
1858 return self.task.execution_path()
1859
1860
1861 @property
1862 def owner_username(self):
1863 if self.task.requested_by:
1864 return self.task.requested_by.login
1865 return None
showard8cc058f2009-09-08 16:26:33 +00001866
1867
showarded2afea2009-07-07 20:54:07 +00001868 def prolog(self):
1869 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001870 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001871 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001872
1873
showardde634ee2009-01-30 01:44:24 +00001874 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001875 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001876
showard2fe3f1d2009-07-06 20:19:11 +00001877 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001878 return # don't fail metahost entries, they'll be reassigned
1879
showard2fe3f1d2009-07-06 20:19:11 +00001880 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001881 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001882 return # entry has been aborted
1883
showard2fe3f1d2009-07-06 20:19:11 +00001884 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001885 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001886 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001887 self._write_keyval_after_job(queued_key, queued_time)
1888 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001889
showard8cc058f2009-09-08 16:26:33 +00001890 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001891 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001892 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001893 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001894
showard8cc058f2009-09-08 16:26:33 +00001895 pidfile_id = _drone_manager.get_pidfile_id_from(
1896 self.queue_entry.execution_path(),
1897 pidfile_name=_AUTOSERV_PID_FILE)
1898 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001899
1900 if self.queue_entry.job.parse_failed_repair:
1901 self._parse_results([self.queue_entry])
1902 else:
1903 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001904
1905
1906 def cleanup(self):
1907 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001908
1909 # We will consider an aborted task to be "Failed"
1910 self.task.finish(bool(self.success))
1911
showardf85a0b72009-10-07 20:48:45 +00001912 if self.monitor:
1913 if self.monitor.has_process():
1914 self._copy_results([self.task])
1915 if self.monitor.pidfile_id is not None:
1916 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001917
1918
1919class RepairTask(SpecialAgentTask):
1920 TASK_TYPE = models.SpecialTask.Task.REPAIR
1921
1922
showardd1195652009-12-08 22:21:02 +00001923 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001924 """\
1925 queue_entry: queue entry to mark failed if this repair fails.
1926 """
1927 protection = host_protections.Protection.get_string(
1928 task.host.protection)
1929 # normalize the protection name
1930 protection = host_protections.Protection.get_attr_name(protection)
1931
1932 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001933 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001934
1935 # *don't* include the queue entry in IDs -- if the queue entry is
1936 # aborted, we want to leave the repair task running
1937 self._set_ids(host=self.host)
1938
1939
1940 def prolog(self):
1941 super(RepairTask, self).prolog()
1942 logging.info("repair_task starting")
1943 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001944
1945
jadmanski0afbb632008-06-06 21:10:57 +00001946 def epilog(self):
1947 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001948
jadmanski0afbb632008-06-06 21:10:57 +00001949 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001950 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001951 else:
showard8cc058f2009-09-08 16:26:33 +00001952 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001953 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001954 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001955
1956
showarded2afea2009-07-07 20:54:07 +00001957class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001958 def _copy_to_results_repository(self):
1959 if not self.queue_entry or self.queue_entry.meta_host:
1960 return
1961
1962 self.queue_entry.set_execution_subdir()
1963 log_name = os.path.basename(self.task.execution_path())
1964 source = os.path.join(self.task.execution_path(), 'debug',
1965 'autoserv.DEBUG')
1966 destination = os.path.join(
1967 self.queue_entry.execution_path(), log_name)
1968
1969 self.monitor.try_copy_to_results_repository(
1970 source, destination_path=destination)
1971
1972
showard170873e2009-01-07 00:22:26 +00001973 def epilog(self):
1974 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001975
showard775300b2009-09-09 15:30:50 +00001976 if self.success:
1977 return
showard8fe93b52008-11-18 17:53:22 +00001978
showard775300b2009-09-09 15:30:50 +00001979 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001980
showard775300b2009-09-09 15:30:50 +00001981 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001982 # effectively ignore failure for these hosts
1983 self.success = True
showard775300b2009-09-09 15:30:50 +00001984 return
1985
1986 if self.queue_entry:
1987 self.queue_entry.requeue()
1988
1989 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001990 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001991 queue_entry__id=self.queue_entry.id):
1992 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1993 self._fail_queue_entry()
1994 return
1995
showard9bb960b2009-11-19 01:02:11 +00001996 queue_entry = models.HostQueueEntry.objects.get(
1997 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001998 else:
1999 queue_entry = None
2000
2001 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00002002 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00002003 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00002004 queue_entry=queue_entry,
2005 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00002006
showard8fe93b52008-11-18 17:53:22 +00002007
2008class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002009 TASK_TYPE = models.SpecialTask.Task.VERIFY
2010
2011
showardd1195652009-12-08 22:21:02 +00002012 def __init__(self, task):
2013 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00002014 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00002015
2016
jadmanski0afbb632008-06-06 21:10:57 +00002017 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002018 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00002019
showardb18134f2009-03-20 20:52:18 +00002020 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002021 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00002022 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2023 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00002024
showarded2afea2009-07-07 20:54:07 +00002025 # Delete any other queued verifies for this host. One verify will do
2026 # and there's no need to keep records of other requests.
2027 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00002028 host__id=self.host.id,
2029 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00002030 is_active=False, is_complete=False)
2031 queued_verifies = queued_verifies.exclude(id=self.task.id)
2032 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00002033
mbligh36768f02008-02-22 18:28:33 +00002034
jadmanski0afbb632008-06-06 21:10:57 +00002035 def epilog(self):
2036 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002037 if self.success:
showard8cc058f2009-09-08 16:26:33 +00002038 if self.queue_entry:
2039 self.queue_entry.on_pending()
2040 else:
2041 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00002042
2043
mbligh4608b002010-01-05 18:22:35 +00002044class CleanupTask(PreJobTask):
2045 # note this can also run post-job, but when it does, it's running standalone
2046 # against the host (not related to the job), so it's not considered a
2047 # PostJobTask
2048
2049 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2050
2051
2052 def __init__(self, task, recover_run_monitor=None):
2053 super(CleanupTask, self).__init__(task, ['--cleanup'])
2054 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2055
2056
2057 def prolog(self):
2058 super(CleanupTask, self).prolog()
2059 logging.info("starting cleanup task for host: %s", self.host.hostname)
2060 self.host.set_status(models.Host.Status.CLEANING)
2061 if self.queue_entry:
2062 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2063
2064
2065 def _finish_epilog(self):
2066 if not self.queue_entry or not self.success:
2067 return
2068
2069 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2070 should_run_verify = (
2071 self.queue_entry.job.run_verify
2072 and self.host.protection != do_not_verify_protection)
2073 if should_run_verify:
2074 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
2075 models.SpecialTask.objects.create(
2076 host=models.Host.objects.get(id=self.host.id),
2077 queue_entry=entry,
2078 task=models.SpecialTask.Task.VERIFY)
2079 else:
2080 self.queue_entry.on_pending()
2081
2082
2083 def epilog(self):
2084 super(CleanupTask, self).epilog()
2085
2086 if self.success:
2087 self.host.update_field('dirty', 0)
2088 self.host.set_status(models.Host.Status.READY)
2089
2090 self._finish_epilog()
2091
2092
showarda9545c02009-12-18 22:44:26 +00002093class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
2094 """
2095 Common functionality for QueueTask and HostlessQueueTask
2096 """
2097 def __init__(self, queue_entries):
2098 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00002099 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00002100 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00002101
2102
showard73ec0442009-02-07 02:05:20 +00002103 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00002104 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00002105
2106
showardd1195652009-12-08 22:21:02 +00002107 def _command_line(self):
2108 return self.job.get_autoserv_params(self.queue_entries)
2109
2110
2111 @property
2112 def num_processes(self):
2113 return len(self.queue_entries)
2114
2115
2116 @property
2117 def owner_username(self):
2118 return self.job.owner
2119
2120
2121 def _working_directory(self):
2122 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002123
2124
jadmanski0afbb632008-06-06 21:10:57 +00002125 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002126 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00002127 keyval_dict = self.job.keyval_dict()
2128 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00002129 group_name = self.queue_entries[0].get_group_name()
2130 if group_name:
2131 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002132 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002133 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002134 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002135 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002136
2137
showard35162b02009-03-03 02:17:30 +00002138 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002139 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002140 _drone_manager.write_lines_to_file(error_file_path,
2141 [_LOST_PROCESS_ERROR])
2142
2143
showardd3dc1992009-04-22 21:01:40 +00002144 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002145 if not self.monitor:
2146 return
2147
showardd9205182009-04-27 20:09:55 +00002148 self._write_job_finished()
2149
showard35162b02009-03-03 02:17:30 +00002150 if self.monitor.lost_process:
2151 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002152
jadmanskif7fa2cc2008-10-01 14:13:23 +00002153
showardcbd74612008-11-19 21:42:02 +00002154 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002155 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002156 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002157 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002158 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002159
2160
jadmanskif7fa2cc2008-10-01 14:13:23 +00002161 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002162 if not self.monitor or not self.monitor.has_process():
2163 return
2164
jadmanskif7fa2cc2008-10-01 14:13:23 +00002165 # build up sets of all the aborted_by and aborted_on values
2166 aborted_by, aborted_on = set(), set()
2167 for queue_entry in self.queue_entries:
2168 if queue_entry.aborted_by:
2169 aborted_by.add(queue_entry.aborted_by)
2170 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2171 aborted_on.add(t)
2172
2173 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00002174 # TODO(showard): this conditional is now obsolete, we just need to leave
2175 # it in temporarily for backwards compatibility over upgrades. delete
2176 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00002177 assert len(aborted_by) <= 1
2178 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002179 aborted_by_value = aborted_by.pop()
2180 aborted_on_value = max(aborted_on)
2181 else:
2182 aborted_by_value = 'autotest_system'
2183 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002184
showarda0382352009-02-11 23:36:43 +00002185 self._write_keyval_after_job("aborted_by", aborted_by_value)
2186 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002187
showardcbd74612008-11-19 21:42:02 +00002188 aborted_on_string = str(datetime.datetime.fromtimestamp(
2189 aborted_on_value))
2190 self._write_status_comment('Job aborted by %s on %s' %
2191 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002192
2193
jadmanski0afbb632008-06-06 21:10:57 +00002194 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002195 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002196 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002197 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002198
2199
jadmanski0afbb632008-06-06 21:10:57 +00002200 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002201 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002202 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002203
2204
2205class QueueTask(AbstractQueueTask):
2206 def __init__(self, queue_entries):
2207 super(QueueTask, self).__init__(queue_entries)
2208 self._set_ids(queue_entries=queue_entries)
2209
2210
2211 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002212 self._check_queue_entry_statuses(
2213 self.queue_entries,
2214 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2215 models.HostQueueEntry.Status.RUNNING),
2216 allowed_host_statuses=(models.Host.Status.PENDING,
2217 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002218
2219 super(QueueTask, self).prolog()
2220
2221 for queue_entry in self.queue_entries:
2222 self._write_host_keyvals(queue_entry.host)
2223 queue_entry.host.set_status(models.Host.Status.RUNNING)
2224 queue_entry.host.update_field('dirty', 1)
2225 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
2226 # TODO(gps): Remove this if nothing needs it anymore.
2227 # A potential user is: tko/parser
2228 self.job.write_to_machines_file(self.queue_entries[0])
2229
2230
2231 def _finish_task(self):
2232 super(QueueTask, self)._finish_task()
2233
2234 for queue_entry in self.queue_entries:
2235 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
mbligh36768f02008-02-22 18:28:33 +00002236
2237
mbligh4608b002010-01-05 18:22:35 +00002238class HostlessQueueTask(AbstractQueueTask):
2239 def __init__(self, queue_entry):
2240 super(HostlessQueueTask, self).__init__([queue_entry])
2241 self.queue_entry_ids = [queue_entry.id]
2242
2243
2244 def prolog(self):
2245 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2246 super(HostlessQueueTask, self).prolog()
2247
2248
mbligh4608b002010-01-05 18:22:35 +00002249 def _finish_task(self):
2250 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002251 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002252
2253
showardd3dc1992009-04-22 21:01:40 +00002254class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002255 def __init__(self, queue_entries, log_file_name):
2256 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002257
showardd1195652009-12-08 22:21:02 +00002258 self.queue_entries = queue_entries
2259
showardd3dc1992009-04-22 21:01:40 +00002260 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002261 self._autoserv_monitor.attach_to_existing_process(
2262 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002263
showardd1195652009-12-08 22:21:02 +00002264
2265 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002266 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002267 return 'true'
2268 return self._generate_command(
2269 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002270
2271
2272 def _generate_command(self, results_dir):
2273 raise NotImplementedError('Subclasses must override this')
2274
2275
showardd1195652009-12-08 22:21:02 +00002276 @property
2277 def owner_username(self):
2278 return self.queue_entries[0].job.owner
2279
2280
2281 def _working_directory(self):
2282 return self._get_consistent_execution_path(self.queue_entries)
2283
2284
2285 def _paired_with_monitor(self):
2286 return self._autoserv_monitor
2287
2288
showardd3dc1992009-04-22 21:01:40 +00002289 def _job_was_aborted(self):
2290 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002291 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002292 queue_entry.update_from_database()
2293 if was_aborted is None: # first queue entry
2294 was_aborted = bool(queue_entry.aborted)
2295 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2296 email_manager.manager.enqueue_notify_email(
2297 'Inconsistent abort state',
2298 'Queue entries have inconsistent abort state: ' +
2299 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2300 # don't crash here, just assume true
2301 return True
2302 return was_aborted
2303
2304
showardd1195652009-12-08 22:21:02 +00002305 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002306 if self._job_was_aborted():
2307 return models.HostQueueEntry.Status.ABORTED
2308
2309 # we'll use a PidfileRunMonitor to read the autoserv exit status
2310 if self._autoserv_monitor.exit_code() == 0:
2311 return models.HostQueueEntry.Status.COMPLETED
2312 return models.HostQueueEntry.Status.FAILED
2313
2314
showardd3dc1992009-04-22 21:01:40 +00002315 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002316 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002317 queue_entry.set_status(status)
2318
2319
2320 def abort(self):
2321 # override AgentTask.abort() to avoid killing the process and ending
2322 # the task. post-job tasks continue when the job is aborted.
2323 pass
2324
2325
mbligh4608b002010-01-05 18:22:35 +00002326 def _pidfile_label(self):
2327 # '.autoserv_execute' -> 'autoserv'
2328 return self._pidfile_name()[1:-len('_execute')]
2329
2330
showard9bb960b2009-11-19 01:02:11 +00002331class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002332 """
2333 Task responsible for
2334 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2335 * copying logs to the results repository
2336 * spawning CleanupTasks for hosts, if necessary
2337 * spawning a FinalReparseTask for the job
2338 """
showardd1195652009-12-08 22:21:02 +00002339 def __init__(self, queue_entries, recover_run_monitor=None):
2340 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002341 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002342 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002343 self._set_ids(queue_entries=queue_entries)
2344
2345
2346 def _generate_command(self, results_dir):
2347 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002348 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002349 return [_autoserv_path , '-p',
2350 '--pidfile-label=%s' % self._pidfile_label(),
2351 '--use-existing-results', '--collect-crashinfo',
2352 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002353
2354
showardd1195652009-12-08 22:21:02 +00002355 @property
2356 def num_processes(self):
2357 return len(self.queue_entries)
2358
2359
2360 def _pidfile_name(self):
2361 return _CRASHINFO_PID_FILE
2362
2363
showardd3dc1992009-04-22 21:01:40 +00002364 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002365 self._check_queue_entry_statuses(
2366 self.queue_entries,
2367 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2368 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002369
showardd3dc1992009-04-22 21:01:40 +00002370 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002371
2372
showardd3dc1992009-04-22 21:01:40 +00002373 def epilog(self):
2374 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002375 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002376 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002377
showard9bb960b2009-11-19 01:02:11 +00002378
2379 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002380 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002381 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002382 models.HostQueueEntry.Status.COMPLETED)
2383 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2384 else:
2385 final_success = False
2386 num_tests_failed = 0
2387
showard9bb960b2009-11-19 01:02:11 +00002388 reboot_after = self._job.reboot_after
2389 do_reboot = (
2390 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002391 self._final_status() == models.HostQueueEntry.Status.ABORTED
showard9bb960b2009-11-19 01:02:11 +00002392 or reboot_after == models.RebootAfter.ALWAYS
2393 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
2394 and final_success and num_tests_failed == 0))
2395
showardd1195652009-12-08 22:21:02 +00002396 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002397 if do_reboot:
2398 # don't pass the queue entry to the CleanupTask. if the cleanup
2399 # fails, the job doesn't care -- it's over.
2400 models.SpecialTask.objects.create(
2401 host=models.Host.objects.get(id=queue_entry.host.id),
2402 task=models.SpecialTask.Task.CLEANUP,
2403 requested_by=self._job.owner_model())
2404 else:
2405 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002406
2407
showard0bbfc212009-04-29 21:06:13 +00002408 def run(self):
showard597bfd32009-05-08 18:22:50 +00002409 autoserv_exit_code = self._autoserv_monitor.exit_code()
2410 # only run if Autoserv exited due to some signal. if we have no exit
2411 # code, assume something bad (and signal-like) happened.
2412 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002413 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002414 else:
2415 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002416
2417
mbligh4608b002010-01-05 18:22:35 +00002418class SelfThrottledPostJobTask(PostJobTask):
2419 """
2420 Special AgentTask subclass that maintains its own global process limit.
2421 """
2422 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002423
2424
mbligh4608b002010-01-05 18:22:35 +00002425 @classmethod
2426 def _increment_running_processes(cls):
2427 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002428
mblighd5c95802008-03-05 00:33:46 +00002429
mbligh4608b002010-01-05 18:22:35 +00002430 @classmethod
2431 def _decrement_running_processes(cls):
2432 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002433
2434
mbligh4608b002010-01-05 18:22:35 +00002435 @classmethod
2436 def _max_processes(cls):
2437 raise NotImplementedError
2438
2439
2440 @classmethod
2441 def _can_run_new_process(cls):
2442 return cls._num_running_processes < cls._max_processes()
2443
2444
2445 def _process_started(self):
2446 return bool(self.monitor)
2447
2448
2449 def tick(self):
2450 # override tick to keep trying to start until the process count goes
2451 # down and we can, at which point we revert to default behavior
2452 if self._process_started():
2453 super(SelfThrottledPostJobTask, self).tick()
2454 else:
2455 self._try_starting_process()
2456
2457
2458 def run(self):
2459 # override run() to not actually run unless we can
2460 self._try_starting_process()
2461
2462
2463 def _try_starting_process(self):
2464 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002465 return
2466
mbligh4608b002010-01-05 18:22:35 +00002467 # actually run the command
2468 super(SelfThrottledPostJobTask, self).run()
2469 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002470
mblighd5c95802008-03-05 00:33:46 +00002471
mbligh4608b002010-01-05 18:22:35 +00002472 def finished(self, success):
2473 super(SelfThrottledPostJobTask, self).finished(success)
2474 if self._process_started():
2475 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002476
showard21baa452008-10-21 00:08:39 +00002477
mbligh4608b002010-01-05 18:22:35 +00002478class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002479 def __init__(self, queue_entries):
2480 super(FinalReparseTask, self).__init__(queue_entries,
2481 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002482 # don't use _set_ids, since we don't want to set the host_ids
2483 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002484
2485
2486 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002487 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002488 results_dir]
2489
2490
2491 @property
2492 def num_processes(self):
2493 return 0 # don't include parser processes in accounting
2494
2495
2496 def _pidfile_name(self):
2497 return _PARSER_PID_FILE
2498
2499
showard97aed502008-11-04 02:01:24 +00002500 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002501 def _max_processes(cls):
2502 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002503
2504
2505 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002506 self._check_queue_entry_statuses(
2507 self.queue_entries,
2508 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002509
showard97aed502008-11-04 02:01:24 +00002510 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002511
2512
2513 def epilog(self):
2514 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002515 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002516
2517
mbligh4608b002010-01-05 18:22:35 +00002518class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002519 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2520
mbligh4608b002010-01-05 18:22:35 +00002521 def __init__(self, queue_entries):
2522 super(ArchiveResultsTask, self).__init__(queue_entries,
2523 log_file_name='.archiving.log')
2524 # don't use _set_ids, since we don't want to set the host_ids
2525 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002526
2527
mbligh4608b002010-01-05 18:22:35 +00002528 def _pidfile_name(self):
2529 return _ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002530
2531
mbligh4608b002010-01-05 18:22:35 +00002532 def _generate_command(self, results_dir):
2533 return [_autoserv_path , '-p',
2534 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
2535 '--use-existing-results',
showard948eb302010-01-15 00:16:20 +00002536 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2537 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002538
2539
mbligh4608b002010-01-05 18:22:35 +00002540 @classmethod
2541 def _max_processes(cls):
2542 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002543
2544
2545 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002546 self._check_queue_entry_statuses(
2547 self.queue_entries,
2548 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2549
2550 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002551
2552
mbligh4608b002010-01-05 18:22:35 +00002553 def epilog(self):
2554 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002555 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002556 failed_file = os.path.join(self._working_directory(),
2557 self._ARCHIVING_FAILED_FILE)
2558 paired_process = self._paired_with_monitor().get_process()
2559 _drone_manager.write_lines_to_file(
2560 failed_file, ['Archiving failed with exit code %s'
2561 % self.monitor.exit_code()],
2562 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002563 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002564
2565
showarda3c58572009-03-12 20:36:59 +00002566class DBError(Exception):
2567 """Raised by the DBObject constructor when its select fails."""
2568
2569
mbligh36768f02008-02-22 18:28:33 +00002570class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002571 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002572
2573 # Subclasses MUST override these:
2574 _table_name = ''
2575 _fields = ()
2576
showarda3c58572009-03-12 20:36:59 +00002577 # A mapping from (type, id) to the instance of the object for that
2578 # particular id. This prevents us from creating new Job() and Host()
2579 # instances for every HostQueueEntry object that we instantiate as
2580 # multiple HQEs often share the same Job.
2581 _instances_by_type_and_id = weakref.WeakValueDictionary()
2582 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002583
showarda3c58572009-03-12 20:36:59 +00002584
2585 def __new__(cls, id=None, **kwargs):
2586 """
2587 Look to see if we already have an instance for this particular type
2588 and id. If so, use it instead of creating a duplicate instance.
2589 """
2590 if id is not None:
2591 instance = cls._instances_by_type_and_id.get((cls, id))
2592 if instance:
2593 return instance
2594 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2595
2596
2597 def __init__(self, id=None, row=None, new_record=False, always_query=True):
showard8cc058f2009-09-08 16:26:33 +00002598 assert bool(id) or bool(row)
2599 if id is not None and row is not None:
2600 assert id == row[0]
showard6ae5ea92009-02-25 00:11:51 +00002601 assert self._table_name, '_table_name must be defined in your class'
2602 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002603 if not new_record:
2604 if self._initialized and not always_query:
2605 return # We've already been initialized.
2606 if id is None:
2607 id = row[0]
2608 # Tell future constructors to use us instead of re-querying while
2609 # this instance is still around.
2610 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002611
showard6ae5ea92009-02-25 00:11:51 +00002612 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002613
jadmanski0afbb632008-06-06 21:10:57 +00002614 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002615
jadmanski0afbb632008-06-06 21:10:57 +00002616 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002617 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002618
showarda3c58572009-03-12 20:36:59 +00002619 if self._initialized:
2620 differences = self._compare_fields_in_row(row)
2621 if differences:
showard7629f142009-03-27 21:02:02 +00002622 logging.warn(
2623 'initialized %s %s instance requery is updating: %s',
2624 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002625 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002626 self._initialized = True
2627
2628
2629 @classmethod
2630 def _clear_instance_cache(cls):
2631 """Used for testing, clear the internal instance cache."""
2632 cls._instances_by_type_and_id.clear()
2633
2634
showardccbd6c52009-03-21 00:10:21 +00002635 def _fetch_row_from_db(self, row_id):
2636 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2637 rows = _db.execute(sql, (row_id,))
2638 if not rows:
showard76e29d12009-04-15 21:53:10 +00002639 raise DBError("row not found (table=%s, row id=%s)"
2640 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002641 return rows[0]
2642
2643
showarda3c58572009-03-12 20:36:59 +00002644 def _assert_row_length(self, row):
2645 assert len(row) == len(self._fields), (
2646 "table = %s, row = %s/%d, fields = %s/%d" % (
2647 self.__table, row, len(row), self._fields, len(self._fields)))
2648
2649
2650 def _compare_fields_in_row(self, row):
2651 """
showarddae680a2009-10-12 20:26:43 +00002652 Given a row as returned by a SELECT query, compare it to our existing in
2653 memory fields. Fractional seconds are stripped from datetime values
2654 before comparison.
showarda3c58572009-03-12 20:36:59 +00002655
2656 @param row - A sequence of values corresponding to fields named in
2657 The class attribute _fields.
2658
2659 @returns A dictionary listing the differences keyed by field name
2660 containing tuples of (current_value, row_value).
2661 """
2662 self._assert_row_length(row)
2663 differences = {}
showarddae680a2009-10-12 20:26:43 +00002664 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
showarda3c58572009-03-12 20:36:59 +00002665 for field, row_value in itertools.izip(self._fields, row):
2666 current_value = getattr(self, field)
showarddae680a2009-10-12 20:26:43 +00002667 if (isinstance(current_value, datetime.datetime)
2668 and isinstance(row_value, datetime.datetime)):
2669 current_value = current_value.strftime(datetime_cmp_fmt)
2670 row_value = row_value.strftime(datetime_cmp_fmt)
showarda3c58572009-03-12 20:36:59 +00002671 if current_value != row_value:
2672 differences[field] = (current_value, row_value)
2673 return differences
showard2bab8f42008-11-12 18:15:22 +00002674
2675
2676 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002677 """
2678 Update our field attributes using a single row returned by SELECT.
2679
2680 @param row - A sequence of values corresponding to fields named in
2681 the class fields list.
2682 """
2683 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002684
showard2bab8f42008-11-12 18:15:22 +00002685 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002686 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002687 setattr(self, field, value)
2688 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002689
showard2bab8f42008-11-12 18:15:22 +00002690 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002691
mblighe2586682008-02-29 22:45:46 +00002692
showardccbd6c52009-03-21 00:10:21 +00002693 def update_from_database(self):
2694 assert self.id is not None
2695 row = self._fetch_row_from_db(self.id)
2696 self._update_fields_from_row(row)
2697
2698
jadmanski0afbb632008-06-06 21:10:57 +00002699 def count(self, where, table = None):
2700 if not table:
2701 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002702
jadmanski0afbb632008-06-06 21:10:57 +00002703 rows = _db.execute("""
2704 SELECT count(*) FROM %s
2705 WHERE %s
2706 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002707
jadmanski0afbb632008-06-06 21:10:57 +00002708 assert len(rows) == 1
2709
2710 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002711
2712
showardd3dc1992009-04-22 21:01:40 +00002713 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002714 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002715
showard2bab8f42008-11-12 18:15:22 +00002716 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002717 return
mbligh36768f02008-02-22 18:28:33 +00002718
mblighf8c624d2008-07-03 16:58:45 +00002719 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002720 _db.execute(query, (value, self.id))
2721
showard2bab8f42008-11-12 18:15:22 +00002722 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002723
2724
jadmanski0afbb632008-06-06 21:10:57 +00002725 def save(self):
2726 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002727 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002728 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002729 values = []
2730 for key in keys:
2731 value = getattr(self, key)
2732 if value is None:
2733 values.append('NULL')
2734 else:
2735 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002736 values_str = ','.join(values)
2737 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2738 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002739 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002740 # Update our id to the one the database just assigned to us.
2741 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002742
2743
jadmanski0afbb632008-06-06 21:10:57 +00002744 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002745 self._instances_by_type_and_id.pop((type(self), id), None)
2746 self._initialized = False
2747 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002748 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2749 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002750
2751
showard63a34772008-08-18 19:32:50 +00002752 @staticmethod
2753 def _prefix_with(string, prefix):
2754 if string:
2755 string = prefix + string
2756 return string
2757
2758
jadmanski0afbb632008-06-06 21:10:57 +00002759 @classmethod
showard989f25d2008-10-01 11:38:11 +00002760 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002761 """
2762 Construct instances of our class based on the given database query.
2763
2764 @yields One class instance for each row fetched.
2765 """
showard63a34772008-08-18 19:32:50 +00002766 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2767 where = cls._prefix_with(where, 'WHERE ')
2768 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002769 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002770 'joins' : joins,
2771 'where' : where,
2772 'order_by' : order_by})
2773 rows = _db.execute(query, params)
showard8cc058f2009-09-08 16:26:33 +00002774 return [cls(id=row[0], row=row) for row in rows]
mblighe2586682008-02-29 22:45:46 +00002775
mbligh36768f02008-02-22 18:28:33 +00002776
2777class IneligibleHostQueue(DBObject):
showardeab66ce2009-12-23 00:03:56 +00002778 _table_name = 'afe_ineligible_host_queues'
showard6ae5ea92009-02-25 00:11:51 +00002779 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002780
2781
showard89f84db2009-03-12 20:39:13 +00002782class AtomicGroup(DBObject):
showardeab66ce2009-12-23 00:03:56 +00002783 _table_name = 'afe_atomic_groups'
showard205fd602009-03-21 00:17:35 +00002784 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2785 'invalid')
showard89f84db2009-03-12 20:39:13 +00002786
2787
showard989f25d2008-10-01 11:38:11 +00002788class Label(DBObject):
showardeab66ce2009-12-23 00:03:56 +00002789 _table_name = 'afe_labels'
showard6ae5ea92009-02-25 00:11:51 +00002790 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002791 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002792
2793
showard6157c632009-07-06 20:19:31 +00002794 def __repr__(self):
2795 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2796 self.name, self.id, self.atomic_group_id)
2797
2798
mbligh36768f02008-02-22 18:28:33 +00002799class Host(DBObject):
showardeab66ce2009-12-23 00:03:56 +00002800 _table_name = 'afe_hosts'
showard6ae5ea92009-02-25 00:11:51 +00002801 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2802 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2803
2804
jadmanski0afbb632008-06-06 21:10:57 +00002805 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002806 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002807 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002808
2809
showard170873e2009-01-07 00:22:26 +00002810 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002811 """
showard170873e2009-01-07 00:22:26 +00002812 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002813 """
2814 rows = _db.execute("""
showardeab66ce2009-12-23 00:03:56 +00002815 SELECT afe_labels.name, afe_labels.platform
2816 FROM afe_labels
2817 INNER JOIN afe_hosts_labels ON
2818 afe_labels.id = afe_hosts_labels.label_id
2819 WHERE afe_hosts_labels.host_id = %s
2820 ORDER BY afe_labels.name
showardd8e548a2008-09-09 03:04:57 +00002821 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002822 platform = None
2823 all_labels = []
2824 for label_name, is_platform in rows:
2825 if is_platform:
2826 platform = label_name
2827 all_labels.append(label_name)
2828 return platform, all_labels
2829
2830
showard54c1ea92009-05-20 00:32:58 +00002831 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2832
2833
2834 @classmethod
2835 def cmp_for_sort(cls, a, b):
2836 """
2837 A comparison function for sorting Host objects by hostname.
2838
2839 This strips any trailing numeric digits, ignores leading 0s and
2840 compares hostnames by the leading name and the trailing digits as a
2841 number. If both hostnames do not match this pattern, they are simply
2842 compared as lower case strings.
2843
2844 Example of how hostnames will be sorted:
2845
2846 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2847
2848 This hopefully satisfy most people's hostname sorting needs regardless
2849 of their exact naming schemes. Nobody sane should have both a host10
2850 and host010 (but the algorithm works regardless).
2851 """
2852 lower_a = a.hostname.lower()
2853 lower_b = b.hostname.lower()
2854 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2855 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2856 if match_a and match_b:
2857 name_a, number_a_str = match_a.groups()
2858 name_b, number_b_str = match_b.groups()
2859 number_a = int(number_a_str.lstrip('0'))
2860 number_b = int(number_b_str.lstrip('0'))
2861 result = cmp((name_a, number_a), (name_b, number_b))
2862 if result == 0 and lower_a != lower_b:
2863 # If they compared equal above but the lower case names are
2864 # indeed different, don't report equality. abc012 != abc12.
2865 return cmp(lower_a, lower_b)
2866 return result
2867 else:
2868 return cmp(lower_a, lower_b)
2869
2870
mbligh36768f02008-02-22 18:28:33 +00002871class HostQueueEntry(DBObject):
showardeab66ce2009-12-23 00:03:56 +00002872 _table_name = 'afe_host_queue_entries'
showard6ae5ea92009-02-25 00:11:51 +00002873 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002874 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002875 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002876
2877
showarda3c58572009-03-12 20:36:59 +00002878 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002879 assert id or row
showarda3c58572009-03-12 20:36:59 +00002880 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002881 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002882
jadmanski0afbb632008-06-06 21:10:57 +00002883 if self.host_id:
2884 self.host = Host(self.host_id)
2885 else:
2886 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002887
showard77182562009-06-10 00:16:05 +00002888 if self.atomic_group_id:
2889 self.atomic_group = AtomicGroup(self.atomic_group_id,
2890 always_query=False)
2891 else:
2892 self.atomic_group = None
2893
showard170873e2009-01-07 00:22:26 +00002894 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002895 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002896
2897
showard89f84db2009-03-12 20:39:13 +00002898 @classmethod
2899 def clone(cls, template):
2900 """
2901 Creates a new row using the values from a template instance.
2902
2903 The new instance will not exist in the database or have a valid
2904 id attribute until its save() method is called.
2905 """
2906 assert isinstance(template, cls)
2907 new_row = [getattr(template, field) for field in cls._fields]
2908 clone = cls(row=new_row, new_record=True)
2909 clone.id = None
2910 return clone
2911
2912
showardc85c21b2008-11-24 22:17:37 +00002913 def _view_job_url(self):
2914 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2915
2916
showardf1ae3542009-05-11 19:26:02 +00002917 def get_labels(self):
2918 """
2919 Get all labels associated with this host queue entry (either via the
2920 meta_host or as a job dependency label). The labels yielded are not
2921 guaranteed to be unique.
2922
2923 @yields Label instances associated with this host_queue_entry.
2924 """
2925 if self.meta_host:
2926 yield Label(id=self.meta_host, always_query=False)
2927 labels = Label.fetch(
showardeab66ce2009-12-23 00:03:56 +00002928 joins="JOIN afe_jobs_dependency_labels AS deps "
2929 "ON (afe_labels.id = deps.label_id)",
showardf1ae3542009-05-11 19:26:02 +00002930 where="deps.job_id = %d" % self.job.id)
2931 for label in labels:
2932 yield label
2933
2934
jadmanski0afbb632008-06-06 21:10:57 +00002935 def set_host(self, host):
2936 if host:
jamesren883492a2010-02-12 00:45:18 +00002937 logging.info('Assigning host %s to entry %s', host.hostname, self)
jadmanski0afbb632008-06-06 21:10:57 +00002938 self.queue_log_record('Assigning host ' + host.hostname)
2939 self.update_field('host_id', host.id)
jadmanski0afbb632008-06-06 21:10:57 +00002940 self.block_host(host.id)
2941 else:
jamesren883492a2010-02-12 00:45:18 +00002942 logging.info('Releasing host from %s', self)
jadmanski0afbb632008-06-06 21:10:57 +00002943 self.queue_log_record('Releasing host')
2944 self.unblock_host(self.host.id)
2945 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002946
jadmanski0afbb632008-06-06 21:10:57 +00002947 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002948
2949
jadmanski0afbb632008-06-06 21:10:57 +00002950 def queue_log_record(self, log_line):
2951 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002952 _drone_manager.write_lines_to_file(self.queue_log_path,
2953 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002954
2955
jadmanski0afbb632008-06-06 21:10:57 +00002956 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002957 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002958 row = [0, self.job.id, host_id]
2959 block = IneligibleHostQueue(row=row, new_record=True)
2960 block.save()
mblighe2586682008-02-29 22:45:46 +00002961
2962
jadmanski0afbb632008-06-06 21:10:57 +00002963 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002964 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002965 blocks = IneligibleHostQueue.fetch(
2966 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2967 for block in blocks:
2968 block.delete()
mblighe2586682008-02-29 22:45:46 +00002969
2970
showard2bab8f42008-11-12 18:15:22 +00002971 def set_execution_subdir(self, subdir=None):
2972 if subdir is None:
showarda9545c02009-12-18 22:44:26 +00002973 assert self.host
2974 subdir = self.host.hostname
showard2bab8f42008-11-12 18:15:22 +00002975 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002976
2977
showard6355f6b2008-12-05 18:52:13 +00002978 def _get_hostname(self):
2979 if self.host:
2980 return self.host.hostname
2981 return 'no host'
2982
2983
showard170873e2009-01-07 00:22:26 +00002984 def __str__(self):
showard828fc4c2009-09-14 20:31:00 +00002985 flags = []
2986 if self.active:
2987 flags.append('active')
2988 if self.complete:
2989 flags.append('complete')
2990 if self.deleted:
2991 flags.append('deleted')
2992 if self.aborted:
2993 flags.append('aborted')
2994 flags_str = ','.join(flags)
2995 if flags_str:
2996 flags_str = ' [%s]' % flags_str
2997 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
2998 self.status, flags_str)
showard170873e2009-01-07 00:22:26 +00002999
3000
jadmanski0afbb632008-06-06 21:10:57 +00003001 def set_status(self, status):
showard56824072009-10-12 20:30:21 +00003002 logging.info("%s -> %s", self, status)
mblighf8c624d2008-07-03 16:58:45 +00003003
showard56824072009-10-12 20:30:21 +00003004 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00003005
mbligh4608b002010-01-05 18:22:35 +00003006 active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
3007 complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
3008 assert not (active and complete)
mbligh36768f02008-02-22 18:28:33 +00003009
mbligh4608b002010-01-05 18:22:35 +00003010 self.update_field('active', active)
3011 self.update_field('complete', complete)
mbligh36768f02008-02-22 18:28:33 +00003012
mbligh4608b002010-01-05 18:22:35 +00003013 if complete:
showardf85a0b72009-10-07 20:48:45 +00003014 self._on_complete()
mbligh4608b002010-01-05 18:22:35 +00003015 self._email_on_job_complete()
showardc85c21b2008-11-24 22:17:37 +00003016
3017 should_email_status = (status.lower() in _notify_email_statuses or
3018 'all' in _notify_email_statuses)
3019 if should_email_status:
3020 self._email_on_status(status)
3021
showardc85c21b2008-11-24 22:17:37 +00003022
showardf85a0b72009-10-07 20:48:45 +00003023 def _on_complete(self):
showardd1195652009-12-08 22:21:02 +00003024 self.job.stop_if_necessary()
showardf85a0b72009-10-07 20:48:45 +00003025 if not self.execution_subdir:
3026 return
3027 # unregister any possible pidfiles associated with this queue entry
3028 for pidfile_name in _ALL_PIDFILE_NAMES:
3029 pidfile_id = _drone_manager.get_pidfile_id_from(
3030 self.execution_path(), pidfile_name=pidfile_name)
3031 _drone_manager.unregister_pidfile(pidfile_id)
3032
3033
showardc85c21b2008-11-24 22:17:37 +00003034 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00003035 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00003036
3037 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
3038 self.job.id, self.job.name, hostname, status)
3039 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
3040 self.job.id, self.job.name, hostname, status,
3041 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00003042 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00003043
3044
3045 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00003046 if not self.job.is_finished():
3047 return
showard542e8402008-09-19 20:16:18 +00003048
showardc85c21b2008-11-24 22:17:37 +00003049 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00003050 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00003051 for queue_entry in hosts_queue:
3052 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00003053 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00003054 queue_entry.status))
3055
3056 summary_text = "\n".join(summary_text)
3057 status_counts = models.Job.objects.get_status_counts(
3058 [self.job.id])[self.job.id]
3059 status = ', '.join('%d %s' % (count, status) for status, count
3060 in status_counts.iteritems())
3061
3062 subject = 'Autotest: Job ID: %s "%s" %s' % (
3063 self.job.id, self.job.name, status)
3064 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
3065 self.job.id, self.job.name, status, self._view_job_url(),
3066 summary_text)
showard170873e2009-01-07 00:22:26 +00003067 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00003068
3069
jamesren883492a2010-02-12 00:45:18 +00003070 def schedule_pre_job_tasks(self):
showard2ca64c92009-12-10 21:41:02 +00003071 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00003072 self.job.name, self.meta_host, self.atomic_group_id,
showard2ca64c92009-12-10 21:41:02 +00003073 self.job.id, self.id, self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00003074
showard8cc058f2009-09-08 16:26:33 +00003075 self._do_schedule_pre_job_tasks()
showard77182562009-06-10 00:16:05 +00003076
3077
showard8cc058f2009-09-08 16:26:33 +00003078 def _do_schedule_pre_job_tasks(self):
showard77182562009-06-10 00:16:05 +00003079 # Every host goes thru the Verifying stage (which may or may not
3080 # actually do anything as determined by get_pre_job_tasks).
3081 self.set_status(models.HostQueueEntry.Status.VERIFYING)
showard8cc058f2009-09-08 16:26:33 +00003082 self.job.schedule_pre_job_tasks(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00003083
showard6ae5ea92009-02-25 00:11:51 +00003084
jadmanski0afbb632008-06-06 21:10:57 +00003085 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00003086 assert self.host
showard8cc058f2009-09-08 16:26:33 +00003087 self.set_status(models.HostQueueEntry.Status.QUEUED)
showard12f3e322009-05-13 21:27:42 +00003088 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00003089 # verify/cleanup failure sets the execution subdir, so reset it here
3090 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00003091 if self.meta_host:
3092 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00003093
3094
jadmanskif7fa2cc2008-10-01 14:13:23 +00003095 @property
3096 def aborted_by(self):
3097 self._load_abort_info()
3098 return self._aborted_by
3099
3100
3101 @property
3102 def aborted_on(self):
3103 self._load_abort_info()
3104 return self._aborted_on
3105
3106
3107 def _load_abort_info(self):
3108 """ Fetch info about who aborted the job. """
3109 if hasattr(self, "_aborted_by"):
3110 return
3111 rows = _db.execute("""
showardeab66ce2009-12-23 00:03:56 +00003112 SELECT afe_users.login,
3113 afe_aborted_host_queue_entries.aborted_on
3114 FROM afe_aborted_host_queue_entries
3115 INNER JOIN afe_users
3116 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
3117 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
jadmanskif7fa2cc2008-10-01 14:13:23 +00003118 """, (self.id,))
3119 if rows:
3120 self._aborted_by, self._aborted_on = rows[0]
3121 else:
3122 self._aborted_by = self._aborted_on = None
3123
3124
showardb2e2c322008-10-14 17:33:55 +00003125 def on_pending(self):
3126 """
3127 Called when an entry in a synchronous job has passed verify. If the
showard8cc058f2009-09-08 16:26:33 +00003128 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
3129 them in PENDING.
showardb2e2c322008-10-14 17:33:55 +00003130 """
showard8cc058f2009-09-08 16:26:33 +00003131 self.set_status(models.HostQueueEntry.Status.PENDING)
3132 self.host.set_status(models.Host.Status.PENDING)
showardb000a8d2009-07-28 20:02:07 +00003133
3134 # Some debug code here: sends an email if an asynchronous job does not
3135 # immediately enter Starting.
3136 # TODO: Remove this once we figure out why asynchronous jobs are getting
3137 # stuck in Pending.
showard8cc058f2009-09-08 16:26:33 +00003138 self.job.run_if_ready(queue_entry=self)
3139 if (self.job.synch_count == 1 and
3140 self.status == models.HostQueueEntry.Status.PENDING):
showardb000a8d2009-07-28 20:02:07 +00003141 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
3142 message = 'Asynchronous job stuck in Pending'
3143 email_manager.manager.enqueue_notify_email(subject, message)
showardb2e2c322008-10-14 17:33:55 +00003144
3145
showardd3dc1992009-04-22 21:01:40 +00003146 def abort(self, dispatcher):
3147 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00003148
showardd3dc1992009-04-22 21:01:40 +00003149 Status = models.HostQueueEntry.Status
mbligh4608b002010-01-05 18:22:35 +00003150 if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING):
showardd3dc1992009-04-22 21:01:40 +00003151 # do nothing; post-job tasks will finish and then mark this entry
3152 # with status "Aborted" and take care of the host
3153 return
3154
showard8cc058f2009-09-08 16:26:33 +00003155 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING):
3156 assert not dispatcher.get_agents_for_entry(self)
showardd3dc1992009-04-22 21:01:40 +00003157 self.host.set_status(models.Host.Status.READY)
3158 elif self.status == Status.VERIFYING:
showard8cc058f2009-09-08 16:26:33 +00003159 models.SpecialTask.objects.create(
3160 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +00003161 host=models.Host.objects.get(id=self.host.id),
3162 requested_by=self.job.owner_model())
showardd3dc1992009-04-22 21:01:40 +00003163
3164 self.set_status(Status.ABORTED)
showardd2014822009-10-12 20:26:58 +00003165 self.job.abort_delay_ready_task()
showard170873e2009-01-07 00:22:26 +00003166
showard8cc058f2009-09-08 16:26:33 +00003167
3168 def get_group_name(self):
3169 atomic_group = self.atomic_group
3170 if not atomic_group:
3171 return ''
3172
3173 # Look at any meta_host and dependency labels and pick the first
3174 # one that also specifies this atomic group. Use that label name
3175 # as the group name if possible (it is more specific).
3176 for label in self.get_labels():
3177 if label.atomic_group_id:
3178 assert label.atomic_group_id == atomic_group.id
3179 return label.name
3180 return atomic_group.name
3181
3182
showard170873e2009-01-07 00:22:26 +00003183 def execution_tag(self):
3184 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00003185 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00003186
3187
showarded2afea2009-07-07 20:54:07 +00003188 def execution_path(self):
3189 return self.execution_tag()
3190
3191
showarda9545c02009-12-18 22:44:26 +00003192 def set_started_on_now(self):
3193 self.update_field('started_on', datetime.datetime.now())
3194
3195
3196 def is_hostless(self):
3197 return (self.host_id is None
3198 and self.meta_host is None
3199 and self.atomic_group_id is None)
3200
3201
mbligh36768f02008-02-22 18:28:33 +00003202class Job(DBObject):
showardeab66ce2009-12-23 00:03:56 +00003203 _table_name = 'afe_jobs'
showard6ae5ea92009-02-25 00:11:51 +00003204 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
3205 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00003206 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00003207 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00003208
showard77182562009-06-10 00:16:05 +00003209 # This does not need to be a column in the DB. The delays are likely to
3210 # be configured short. If the scheduler is stopped and restarted in
3211 # the middle of a job's delay cycle, the delay cycle will either be
3212 # repeated or skipped depending on the number of Pending machines found
3213 # when the restarted scheduler recovers to track it. Not a problem.
3214 #
3215 # A reference to the DelayedCallTask that will wake up the job should
3216 # no other HQEs change state in time. Its end_time attribute is used
3217 # by our run_with_ready_delay() method to determine if the wait is over.
3218 _delay_ready_task = None
3219
3220 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
3221 # all status='Pending' atomic group HQEs incase a delay was running when the
3222 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00003223
showarda3c58572009-03-12 20:36:59 +00003224 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00003225 assert id or row
showarda3c58572009-03-12 20:36:59 +00003226 super(Job, self).__init__(id=id, row=row, **kwargs)
showard9bb960b2009-11-19 01:02:11 +00003227 self._owner_model = None # caches model instance of owner
3228
3229
showardc1a98d12010-01-15 00:22:22 +00003230 def model(self):
3231 return models.Job.objects.get(id=self.id)
3232
3233
showard9bb960b2009-11-19 01:02:11 +00003234 def owner_model(self):
3235 # work around the fact that the Job owner field is a string, not a
3236 # foreign key
3237 if not self._owner_model:
3238 self._owner_model = models.User.objects.get(login=self.owner)
3239 return self._owner_model
mbligh36768f02008-02-22 18:28:33 +00003240
mblighe2586682008-02-29 22:45:46 +00003241
jadmanski0afbb632008-06-06 21:10:57 +00003242 def is_server_job(self):
3243 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00003244
3245
showard170873e2009-01-07 00:22:26 +00003246 def tag(self):
3247 return "%s-%s" % (self.id, self.owner)
3248
3249
jadmanski0afbb632008-06-06 21:10:57 +00003250 def get_host_queue_entries(self):
3251 rows = _db.execute("""
showardeab66ce2009-12-23 00:03:56 +00003252 SELECT * FROM afe_host_queue_entries
jadmanski0afbb632008-06-06 21:10:57 +00003253 WHERE job_id= %s
3254 """, (self.id,))
3255 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00003256
jadmanski0afbb632008-06-06 21:10:57 +00003257 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00003258
jadmanski0afbb632008-06-06 21:10:57 +00003259 return entries
mbligh36768f02008-02-22 18:28:33 +00003260
3261
jadmanski0afbb632008-06-06 21:10:57 +00003262 def set_status(self, status, update_queues=False):
3263 self.update_field('status',status)
3264
3265 if update_queues:
3266 for queue_entry in self.get_host_queue_entries():
3267 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00003268
3269
showardc1a98d12010-01-15 00:22:22 +00003270 def keyval_dict(self):
3271 return self.model().keyval_dict()
3272
3273
showard77182562009-06-10 00:16:05 +00003274 def _atomic_and_has_started(self):
3275 """
3276 @returns True if any of the HostQueueEntries associated with this job
3277 have entered the Status.STARTING state or beyond.
3278 """
3279 atomic_entries = models.HostQueueEntry.objects.filter(
3280 job=self.id, atomic_group__isnull=False)
3281 if atomic_entries.count() <= 0:
3282 return False
3283
showardaf8b4ca2009-06-16 18:47:26 +00003284 # These states may *only* be reached if Job.run() has been called.
3285 started_statuses = (models.HostQueueEntry.Status.STARTING,
3286 models.HostQueueEntry.Status.RUNNING,
3287 models.HostQueueEntry.Status.COMPLETED)
3288
3289 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00003290 return started_entries.count() > 0
3291
3292
showard708b3522009-08-20 23:26:15 +00003293 def _hosts_assigned_count(self):
3294 """The number of HostQueueEntries assigned a Host for this job."""
3295 entries = models.HostQueueEntry.objects.filter(job=self.id,
3296 host__isnull=False)
3297 return entries.count()
3298
3299
showard77182562009-06-10 00:16:05 +00003300 def _pending_count(self):
3301 """The number of HostQueueEntries for this job in the Pending state."""
3302 pending_entries = models.HostQueueEntry.objects.filter(
3303 job=self.id, status=models.HostQueueEntry.Status.PENDING)
3304 return pending_entries.count()
3305
3306
showardd07a5f32009-12-07 19:36:20 +00003307 def _max_hosts_needed_to_run(self, atomic_group):
showardd2014822009-10-12 20:26:58 +00003308 """
3309 @param atomic_group: The AtomicGroup associated with this job that we
showardd07a5f32009-12-07 19:36:20 +00003310 are using to set an upper bound on the threshold.
3311 @returns The maximum number of HostQueueEntries assigned a Host before
showardd2014822009-10-12 20:26:58 +00003312 this job can run.
3313 """
3314 return min(self._hosts_assigned_count(),
3315 atomic_group.max_number_of_machines)
3316
3317
showardd07a5f32009-12-07 19:36:20 +00003318 def _min_hosts_needed_to_run(self):
3319 """Return the minumum number of hsots needed to run this job."""
3320 return self.synch_count
3321
3322
jadmanski0afbb632008-06-06 21:10:57 +00003323 def is_ready(self):
showard77182562009-06-10 00:16:05 +00003324 # NOTE: Atomic group jobs stop reporting ready after they have been
3325 # started to avoid launching multiple copies of one atomic job.
3326 # Only possible if synch_count is less than than half the number of
3327 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00003328 pending_count = self._pending_count()
3329 atomic_and_has_started = self._atomic_and_has_started()
3330 ready = (pending_count >= self.synch_count
showardd2014822009-10-12 20:26:58 +00003331 and not atomic_and_has_started)
showardb000a8d2009-07-28 20:02:07 +00003332
3333 if not ready:
3334 logging.info(
3335 'Job %s not ready: %s pending, %s required '
3336 '(Atomic and started: %s)',
3337 self, pending_count, self.synch_count,
3338 atomic_and_has_started)
3339
3340 return ready
mbligh36768f02008-02-22 18:28:33 +00003341
3342
jadmanski0afbb632008-06-06 21:10:57 +00003343 def num_machines(self, clause = None):
3344 sql = "job_id=%s" % self.id
3345 if clause:
3346 sql += " AND (%s)" % clause
showardeab66ce2009-12-23 00:03:56 +00003347 return self.count(sql, table='afe_host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00003348
3349
jadmanski0afbb632008-06-06 21:10:57 +00003350 def num_queued(self):
3351 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00003352
3353
jadmanski0afbb632008-06-06 21:10:57 +00003354 def num_active(self):
3355 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00003356
3357
jadmanski0afbb632008-06-06 21:10:57 +00003358 def num_complete(self):
3359 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00003360
3361
jadmanski0afbb632008-06-06 21:10:57 +00003362 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00003363 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00003364
mbligh36768f02008-02-22 18:28:33 +00003365
showard6bb7c292009-01-30 01:44:51 +00003366 def _not_yet_run_entries(self, include_verifying=True):
3367 statuses = [models.HostQueueEntry.Status.QUEUED,
3368 models.HostQueueEntry.Status.PENDING]
3369 if include_verifying:
3370 statuses.append(models.HostQueueEntry.Status.VERIFYING)
3371 return models.HostQueueEntry.objects.filter(job=self.id,
3372 status__in=statuses)
3373
3374
3375 def _stop_all_entries(self):
3376 entries_to_stop = self._not_yet_run_entries(
3377 include_verifying=False)
3378 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00003379 assert not child_entry.complete, (
3380 '%s status=%s, active=%s, complete=%s' %
3381 (child_entry.id, child_entry.status, child_entry.active,
3382 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00003383 if child_entry.status == models.HostQueueEntry.Status.PENDING:
3384 child_entry.host.status = models.Host.Status.READY
3385 child_entry.host.save()
3386 child_entry.status = models.HostQueueEntry.Status.STOPPED
3387 child_entry.save()
3388
showard2bab8f42008-11-12 18:15:22 +00003389 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00003390 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00003391 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00003392 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00003393
3394
jadmanski0afbb632008-06-06 21:10:57 +00003395 def write_to_machines_file(self, queue_entry):
showarda9545c02009-12-18 22:44:26 +00003396 hostname = queue_entry.host.hostname
showard170873e2009-01-07 00:22:26 +00003397 file_path = os.path.join(self.tag(), '.machines')
3398 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00003399
3400
showardf1ae3542009-05-11 19:26:02 +00003401 def _next_group_name(self, group_name=''):
3402 """@returns a directory name to use for the next host group results."""
3403 if group_name:
3404 # Sanitize for use as a pathname.
3405 group_name = group_name.replace(os.path.sep, '_')
3406 if group_name.startswith('.'):
3407 group_name = '_' + group_name[1:]
3408 # Add a separator between the group name and 'group%d'.
3409 group_name += '.'
3410 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00003411 query = models.HostQueueEntry.objects.filter(
3412 job=self.id).values('execution_subdir').distinct()
3413 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003414 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3415 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003416 if ids:
3417 next_id = max(ids) + 1
3418 else:
3419 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003420 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003421
3422
showarddb502762009-09-09 15:31:20 +00003423 def _write_control_file(self, execution_path):
showard170873e2009-01-07 00:22:26 +00003424 control_path = _drone_manager.attach_file_to_execution(
showarddb502762009-09-09 15:31:20 +00003425 execution_path, self.control_file)
showard170873e2009-01-07 00:22:26 +00003426 return control_path
mbligh36768f02008-02-22 18:28:33 +00003427
showardb2e2c322008-10-14 17:33:55 +00003428
showard2bab8f42008-11-12 18:15:22 +00003429 def get_group_entries(self, queue_entry_from_group):
showard8375ce02009-10-12 20:35:13 +00003430 """
3431 @param queue_entry_from_group: A HostQueueEntry instance to find other
3432 group entries on this job for.
3433
3434 @returns A list of HostQueueEntry objects all executing this job as
3435 part of the same group as the one supplied (having the same
3436 execution_subdir).
3437 """
showard2bab8f42008-11-12 18:15:22 +00003438 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003439 return list(HostQueueEntry.fetch(
3440 where='job_id=%s AND execution_subdir=%s',
3441 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003442
3443
showard8cc058f2009-09-08 16:26:33 +00003444 def get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003445 assert queue_entries
showarddb502762009-09-09 15:31:20 +00003446 execution_path = queue_entries[0].execution_path()
3447 control_path = self._write_control_file(execution_path)
showarda9545c02009-12-18 22:44:26 +00003448 hostnames = ','.join(entry.host.hostname
3449 for entry in queue_entries
3450 if not entry.is_hostless())
mbligh36768f02008-02-22 18:28:33 +00003451
showarddb502762009-09-09 15:31:20 +00003452 execution_tag = queue_entries[0].execution_tag()
showard87ba02a2009-04-20 19:37:32 +00003453 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003454 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003455 ['-P', execution_tag, '-n',
3456 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003457 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003458
jadmanski0afbb632008-06-06 21:10:57 +00003459 if not self.is_server_job():
3460 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003461
showardb2e2c322008-10-14 17:33:55 +00003462 return params
mblighe2586682008-02-29 22:45:46 +00003463
mbligh36768f02008-02-22 18:28:33 +00003464
showardc9ae1782009-01-30 01:42:37 +00003465 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003466 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003467 return True
showard0fc38302008-10-23 00:44:07 +00003468 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showarda9545c02009-12-18 22:44:26 +00003469 return queue_entry.host.dirty
showardc9ae1782009-01-30 01:42:37 +00003470 return False
showard21baa452008-10-21 00:08:39 +00003471
showardc9ae1782009-01-30 01:42:37 +00003472
showard8cc058f2009-09-08 16:26:33 +00003473 def _should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003474 do_not_verify = (queue_entry.host.protection ==
3475 host_protections.Protection.DO_NOT_VERIFY)
3476 if do_not_verify:
3477 return False
3478 return self.run_verify
3479
3480
showard8cc058f2009-09-08 16:26:33 +00003481 def schedule_pre_job_tasks(self, queue_entry):
showard77182562009-06-10 00:16:05 +00003482 """
3483 Get a list of tasks to perform before the host_queue_entry
3484 may be used to run this Job (such as Cleanup & Verify).
3485
3486 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003487 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003488 task in the list calls HostQueueEntry.on_pending(), which
3489 continues the flow of the job.
3490 """
showardc9ae1782009-01-30 01:42:37 +00003491 if self._should_run_cleanup(queue_entry):
showard8cc058f2009-09-08 16:26:33 +00003492 task = models.SpecialTask.Task.CLEANUP
3493 elif self._should_run_verify(queue_entry):
3494 task = models.SpecialTask.Task.VERIFY
3495 else:
3496 queue_entry.on_pending()
3497 return
3498
showard9bb960b2009-11-19 01:02:11 +00003499 queue_entry = models.HostQueueEntry.objects.get(id=queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00003500 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00003501 host=models.Host.objects.get(id=queue_entry.host_id),
3502 queue_entry=queue_entry, task=task)
showard21baa452008-10-21 00:08:39 +00003503
3504
showardf1ae3542009-05-11 19:26:02 +00003505 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003506 if len(queue_entries) == 1:
showarda9545c02009-12-18 22:44:26 +00003507 group_subdir_name = queue_entries[0].host.hostname
showard2bab8f42008-11-12 18:15:22 +00003508 else:
showardf1ae3542009-05-11 19:26:02 +00003509 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003510 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003511 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003512 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003513
3514 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003515 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003516
3517
3518 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003519 """
3520 @returns A tuple containing a list of HostQueueEntry instances to be
3521 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003522 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003523 """
showard77182562009-06-10 00:16:05 +00003524 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003525 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003526 if atomic_group:
3527 num_entries_wanted = atomic_group.max_number_of_machines
3528 else:
3529 num_entries_wanted = self.synch_count
3530 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003531
showardf1ae3542009-05-11 19:26:02 +00003532 if num_entries_wanted > 0:
3533 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003534 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003535 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003536 params=(self.id, include_queue_entry.id)))
3537
3538 # Sort the chosen hosts by hostname before slicing.
3539 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3540 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3541 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3542 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003543
showardf1ae3542009-05-11 19:26:02 +00003544 # Sanity check. We'll only ever be called if this can be met.
showard828fc4c2009-09-14 20:31:00 +00003545 if len(chosen_entries) < self.synch_count:
3546 message = ('job %s got less than %s chosen entries: %s' % (
3547 self.id, self.synch_count, chosen_entries))
3548 logging.error(message)
3549 email_manager.manager.enqueue_notify_email(
3550 'Job not started, too few chosen entries', message)
3551 return []
showardf1ae3542009-05-11 19:26:02 +00003552
showard8cc058f2009-09-08 16:26:33 +00003553 group_name = include_queue_entry.get_group_name()
showardf1ae3542009-05-11 19:26:02 +00003554
3555 self._assign_new_group(chosen_entries, group_name=group_name)
showard8cc058f2009-09-08 16:26:33 +00003556 return chosen_entries
showard2bab8f42008-11-12 18:15:22 +00003557
3558
showard77182562009-06-10 00:16:05 +00003559 def run_if_ready(self, queue_entry):
3560 """
showard8375ce02009-10-12 20:35:13 +00003561 Run this job by kicking its HQEs into status='Starting' if enough
3562 hosts are ready for it to run.
3563
3564 Cleans up by kicking HQEs into status='Stopped' if this Job is not
3565 ready to run.
showard77182562009-06-10 00:16:05 +00003566 """
showardb2e2c322008-10-14 17:33:55 +00003567 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003568 self.stop_if_necessary()
showard8cc058f2009-09-08 16:26:33 +00003569 elif queue_entry.atomic_group:
3570 self.run_with_ready_delay(queue_entry)
3571 else:
3572 self.run(queue_entry)
showard77182562009-06-10 00:16:05 +00003573
3574
3575 def run_with_ready_delay(self, queue_entry):
3576 """
3577 Start a delay to wait for more hosts to enter Pending state before
3578 launching an atomic group job. Once set, the a delay cannot be reset.
3579
3580 @param queue_entry: The HostQueueEntry object to get atomic group
3581 info from and pass to run_if_ready when the delay is up.
3582
3583 @returns An Agent to run the job as appropriate or None if a delay
3584 has already been set.
3585 """
3586 assert queue_entry.job_id == self.id
3587 assert queue_entry.atomic_group
3588 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
showardd2014822009-10-12 20:26:58 +00003589 over_max_threshold = (self._pending_count() >=
showardd07a5f32009-12-07 19:36:20 +00003590 self._max_hosts_needed_to_run(queue_entry.atomic_group))
showard77182562009-06-10 00:16:05 +00003591 delay_expired = (self._delay_ready_task and
3592 time.time() >= self._delay_ready_task.end_time)
3593
3594 # Delay is disabled or we already have enough? Do not wait to run.
3595 if not delay or over_max_threshold or delay_expired:
showard8cc058f2009-09-08 16:26:33 +00003596 self.run(queue_entry)
3597 else:
3598 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
showard77182562009-06-10 00:16:05 +00003599
showard8cc058f2009-09-08 16:26:33 +00003600
showardd07a5f32009-12-07 19:36:20 +00003601 def request_abort(self):
3602 """Request that this Job be aborted on the next scheduler cycle."""
showardc1a98d12010-01-15 00:22:22 +00003603 self.model().abort()
showardd07a5f32009-12-07 19:36:20 +00003604
3605
showard8cc058f2009-09-08 16:26:33 +00003606 def schedule_delayed_callback_task(self, queue_entry):
3607 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
3608
showard77182562009-06-10 00:16:05 +00003609 if self._delay_ready_task:
3610 return None
3611
showard8cc058f2009-09-08 16:26:33 +00003612 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3613
showard77182562009-06-10 00:16:05 +00003614 def run_job_after_delay():
showardd2014822009-10-12 20:26:58 +00003615 logging.info('Job %s done waiting for extra hosts.', self)
3616 # Check to see if the job is still relevant. It could have aborted
3617 # while we were waiting or hosts could have disappearred, etc.
showardd07a5f32009-12-07 19:36:20 +00003618 if self._pending_count() < self._min_hosts_needed_to_run():
showardd2014822009-10-12 20:26:58 +00003619 logging.info('Job %s had too few Pending hosts after waiting '
3620 'for extras. Not running.', self)
showardd07a5f32009-12-07 19:36:20 +00003621 self.request_abort()
showardd2014822009-10-12 20:26:58 +00003622 return
showard77182562009-06-10 00:16:05 +00003623 return self.run(queue_entry)
3624
showard708b3522009-08-20 23:26:15 +00003625 logging.info('Job %s waiting up to %s seconds for more hosts.',
3626 self.id, delay)
showard77182562009-06-10 00:16:05 +00003627 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3628 callback=run_job_after_delay)
showard8cc058f2009-09-08 16:26:33 +00003629 return self._delay_ready_task
showard77182562009-06-10 00:16:05 +00003630
3631
3632 def run(self, queue_entry):
3633 """
3634 @param queue_entry: The HostQueueEntry instance calling this method.
showard77182562009-06-10 00:16:05 +00003635 """
3636 if queue_entry.atomic_group and self._atomic_and_has_started():
3637 logging.error('Job.run() called on running atomic Job %d '
3638 'with HQE %s.', self.id, queue_entry)
showard8cc058f2009-09-08 16:26:33 +00003639 return
3640 queue_entries = self._choose_group_to_run(queue_entry)
showard828fc4c2009-09-14 20:31:00 +00003641 if queue_entries:
3642 self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00003643
3644
showard8cc058f2009-09-08 16:26:33 +00003645 def _finish_run(self, queue_entries):
showardb2ccdda2008-10-28 20:39:05 +00003646 for queue_entry in queue_entries:
showard8cc058f2009-09-08 16:26:33 +00003647 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showardd2014822009-10-12 20:26:58 +00003648 self.abort_delay_ready_task()
3649
3650
3651 def abort_delay_ready_task(self):
3652 """Abort the delayed task associated with this job, if any."""
showard77182562009-06-10 00:16:05 +00003653 if self._delay_ready_task:
3654 # Cancel any pending callback that would try to run again
3655 # as we are already running.
3656 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003657
showardd2014822009-10-12 20:26:58 +00003658
showardb000a8d2009-07-28 20:02:07 +00003659 def __str__(self):
3660 return '%s-%s' % (self.id, self.owner)
3661
3662
mbligh36768f02008-02-22 18:28:33 +00003663if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003664 main()