blob: 6177e1332843332c70b2d0f686fbfc40c3637d95 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard89f84db2009-03-12 20:39:13 +00008import datetime, errno, optparse, os, pwd, Queue, random, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000017from autotest_lib.frontend.afe import models
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
mblighf3294cc2009-04-08 21:17:38 +000019from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000020from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000021
mblighb090f142008-02-27 21:33:46 +000022
mbligh36768f02008-02-22 18:28:33 +000023RESULTS_DIR = '.'
24AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000025DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000026
27AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showard35162b02009-03-03 02:17:30 +000040# error message to leave in results dir when an autoserv process disappears
41# mysteriously
42_LOST_PROCESS_ERROR = """\
43Autoserv failed abnormally during execution for this job, probably due to a
44system error on the Autotest server. Full results may not be available. Sorry.
45"""
46
mbligh6f8bab42008-02-29 22:45:14 +000047_db = None
mbligh36768f02008-02-22 18:28:33 +000048_shutdown = False
showard170873e2009-01-07 00:22:26 +000049_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
50_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000051_testing_mode = False
showard542e8402008-09-19 20:16:18 +000052_base_url = None
showardc85c21b2008-11-24 22:17:37 +000053_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000054_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000055
showardb18134f2009-03-20 20:52:18 +000056# load the logging settings
57scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000058if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
59 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000060# Here we export the log name, using the same convention as autoserv's results
61# directory.
mblighc9895aa2009-04-01 18:36:58 +000062if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
63 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
64else:
65 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
66 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
67
showardb18134f2009-03-20 20:52:18 +000068logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
69
mbligh36768f02008-02-22 18:28:33 +000070
71def main():
showard27f33872009-04-07 18:20:53 +000072 try:
73 main_without_exception_handling()
74 except:
75 logging.exception('Exception escaping in monitor_db')
76 raise
77
78
79def main_without_exception_handling():
jadmanski0afbb632008-06-06 21:10:57 +000080 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000081
jadmanski0afbb632008-06-06 21:10:57 +000082 parser = optparse.OptionParser(usage)
83 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
84 action='store_true')
85 parser.add_option('--logfile', help='Set a log file that all stdout ' +
86 'should be redirected to. Stderr will go to this ' +
87 'file + ".err"')
88 parser.add_option('--test', help='Indicate that scheduler is under ' +
89 'test and should use dummy autoserv and no parsing',
90 action='store_true')
91 (options, args) = parser.parse_args()
92 if len(args) != 1:
93 parser.print_usage()
94 return
mbligh36768f02008-02-22 18:28:33 +000095
jadmanski0afbb632008-06-06 21:10:57 +000096 global RESULTS_DIR
97 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +000098
showardcca334f2009-03-12 20:38:34 +000099 # Change the cwd while running to avoid issues incase we were launched from
100 # somewhere odd (such as a random NFS home directory of the person running
101 # sudo to launch us as the appropriate user).
102 os.chdir(RESULTS_DIR)
103
jadmanski0afbb632008-06-06 21:10:57 +0000104 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000105 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
106 "notify_email_statuses",
107 default='')
showardc85c21b2008-11-24 22:17:37 +0000108 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000109 _notify_email_statuses = [status for status in
110 re.split(r'[\s,;:]', notify_statuses_list.lower())
111 if status]
showardc85c21b2008-11-24 22:17:37 +0000112
jadmanski0afbb632008-06-06 21:10:57 +0000113 if options.test:
114 global _autoserv_path
115 _autoserv_path = 'autoserv_dummy'
116 global _testing_mode
117 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000118
mbligh37eceaa2008-12-15 22:56:37 +0000119 # AUTOTEST_WEB.base_url is still a supported config option as some people
120 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000121 global _base_url
showard170873e2009-01-07 00:22:26 +0000122 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
123 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000124 if config_base_url:
125 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000126 else:
mbligh37eceaa2008-12-15 22:56:37 +0000127 # For the common case of everything running on a single server you
128 # can just set the hostname in a single place in the config file.
129 server_name = c.get_config_value('SERVER', 'hostname')
130 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000131 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000132 sys.exit(1)
133 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000134
showardc5afc462009-01-13 00:09:39 +0000135 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000136 server.start()
137
jadmanski0afbb632008-06-06 21:10:57 +0000138 try:
showardc5afc462009-01-13 00:09:39 +0000139 init(options.logfile)
140 dispatcher = Dispatcher()
141 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
142
jadmanski0afbb632008-06-06 21:10:57 +0000143 while not _shutdown:
144 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000145 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000146 except:
showard170873e2009-01-07 00:22:26 +0000147 email_manager.manager.log_stacktrace(
148 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000149
showard170873e2009-01-07 00:22:26 +0000150 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000151 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000152 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000153 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000154
155
156def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000157 global _shutdown
158 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000159 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000160
161
162def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000163 if logfile:
164 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000165 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
166 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000167
mblighfb676032009-04-01 18:25:38 +0000168 utils.write_pid("monitor_db")
169
showardb1e51872008-10-07 11:08:18 +0000170 if _testing_mode:
171 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000172 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000173
jadmanski0afbb632008-06-06 21:10:57 +0000174 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
175 global _db
showard170873e2009-01-07 00:22:26 +0000176 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000177 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000178
showardfa8629c2008-11-04 16:51:23 +0000179 # ensure Django connection is in autocommit
180 setup_django_environment.enable_autocommit()
181
showardb18134f2009-03-20 20:52:18 +0000182 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000183 signal.signal(signal.SIGINT, handle_sigint)
184
showardd1ee1dd2009-01-07 21:33:08 +0000185 drones = global_config.global_config.get_config_value(
186 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
187 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000188 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000189 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000190 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
191
showardb18134f2009-03-20 20:52:18 +0000192 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000193
194
195def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000196 out_file = logfile
197 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000198 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000199 out_fd = open(out_file, "a", buffering=0)
200 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000201
jadmanski0afbb632008-06-06 21:10:57 +0000202 os.dup2(out_fd.fileno(), sys.stdout.fileno())
203 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000204
jadmanski0afbb632008-06-06 21:10:57 +0000205 sys.stdout = out_fd
206 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000207
208
mblighd5c95802008-03-05 00:33:46 +0000209def queue_entries_to_abort():
jadmanski0afbb632008-06-06 21:10:57 +0000210 rows = _db.execute("""
211 SELECT * FROM host_queue_entries WHERE status='Abort';
212 """)
showard2bab8f42008-11-12 18:15:22 +0000213
jadmanski0afbb632008-06-06 21:10:57 +0000214 qe = [HostQueueEntry(row=i) for i in rows]
215 return qe
mbligh36768f02008-02-22 18:28:33 +0000216
showard7cf9a9b2008-05-15 21:15:52 +0000217
showard87ba02a2009-04-20 19:37:32 +0000218def _autoserv_command_line(machines, results_dir, extra_args, job=None,
219 queue_entry=None):
220 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
221 '-r', _drone_manager.absolute_path(results_dir)]
222 if job or queue_entry:
223 if not job:
224 job = queue_entry.job
225 autoserv_argv += ['-u', job.owner, '-l', job.name]
226 return autoserv_argv + extra_args
227
228
showard89f84db2009-03-12 20:39:13 +0000229class SchedulerError(Exception):
230 """Raised by HostScheduler when an inconsistent state occurs."""
231
232
showard63a34772008-08-18 19:32:50 +0000233class HostScheduler(object):
234 def _get_ready_hosts(self):
235 # avoid any host with a currently active queue entry against it
236 hosts = Host.fetch(
237 joins='LEFT JOIN host_queue_entries AS active_hqe '
238 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000239 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000240 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000241 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000242 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
243 return dict((host.id, host) for host in hosts)
244
245
246 @staticmethod
247 def _get_sql_id_list(id_list):
248 return ','.join(str(item_id) for item_id in id_list)
249
250
251 @classmethod
showard989f25d2008-10-01 11:38:11 +0000252 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000253 if not id_list:
254 return {}
showard63a34772008-08-18 19:32:50 +0000255 query %= cls._get_sql_id_list(id_list)
256 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000257 return cls._process_many2many_dict(rows, flip)
258
259
260 @staticmethod
261 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000262 result = {}
263 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000264 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000265 if flip:
266 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000267 result.setdefault(left_id, set()).add(right_id)
268 return result
269
270
271 @classmethod
272 def _get_job_acl_groups(cls, job_ids):
273 query = """
showardd9ac4452009-02-07 02:04:37 +0000274 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000275 FROM jobs
276 INNER JOIN users ON users.login = jobs.owner
277 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
278 WHERE jobs.id IN (%s)
279 """
280 return cls._get_many2many_dict(query, job_ids)
281
282
283 @classmethod
284 def _get_job_ineligible_hosts(cls, job_ids):
285 query = """
286 SELECT job_id, host_id
287 FROM ineligible_host_queues
288 WHERE job_id IN (%s)
289 """
290 return cls._get_many2many_dict(query, job_ids)
291
292
293 @classmethod
showard989f25d2008-10-01 11:38:11 +0000294 def _get_job_dependencies(cls, job_ids):
295 query = """
296 SELECT job_id, label_id
297 FROM jobs_dependency_labels
298 WHERE job_id IN (%s)
299 """
300 return cls._get_many2many_dict(query, job_ids)
301
302
303 @classmethod
showard63a34772008-08-18 19:32:50 +0000304 def _get_host_acls(cls, host_ids):
305 query = """
showardd9ac4452009-02-07 02:04:37 +0000306 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000307 FROM acl_groups_hosts
308 WHERE host_id IN (%s)
309 """
310 return cls._get_many2many_dict(query, host_ids)
311
312
313 @classmethod
314 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000315 if not host_ids:
316 return {}, {}
showard63a34772008-08-18 19:32:50 +0000317 query = """
318 SELECT label_id, host_id
319 FROM hosts_labels
320 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000321 """ % cls._get_sql_id_list(host_ids)
322 rows = _db.execute(query)
323 labels_to_hosts = cls._process_many2many_dict(rows)
324 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
325 return labels_to_hosts, hosts_to_labels
326
327
328 @classmethod
329 def _get_labels(cls):
330 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000331
332
333 def refresh(self, pending_queue_entries):
334 self._hosts_available = self._get_ready_hosts()
335
336 relevant_jobs = [queue_entry.job_id
337 for queue_entry in pending_queue_entries]
338 self._job_acls = self._get_job_acl_groups(relevant_jobs)
339 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000340 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000341
342 host_ids = self._hosts_available.keys()
343 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000344 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
345
346 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000347
348
349 def _is_acl_accessible(self, host_id, queue_entry):
350 job_acls = self._job_acls.get(queue_entry.job_id, set())
351 host_acls = self._host_acls.get(host_id, set())
352 return len(host_acls.intersection(job_acls)) > 0
353
354
showard989f25d2008-10-01 11:38:11 +0000355 def _check_job_dependencies(self, job_dependencies, host_labels):
356 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000357 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000358
359
360 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
361 queue_entry):
showardade14e22009-01-26 22:38:32 +0000362 if not queue_entry.meta_host:
363 # bypass only_if_needed labels when a specific host is selected
364 return True
365
showard989f25d2008-10-01 11:38:11 +0000366 for label_id in host_labels:
367 label = self._labels[label_id]
368 if not label.only_if_needed:
369 # we don't care about non-only_if_needed labels
370 continue
371 if queue_entry.meta_host == label_id:
372 # if the label was requested in a metahost it's OK
373 continue
374 if label_id not in job_dependencies:
375 return False
376 return True
377
378
showard89f84db2009-03-12 20:39:13 +0000379 def _check_atomic_group_labels(self, host_labels, queue_entry):
380 """
381 Determine if the given HostQueueEntry's atomic group settings are okay
382 to schedule on a host with the given labels.
383
384 @param host_labels - A list of label ids that the host has.
385 @param queue_entry - The HostQueueEntry being considered for the host.
386
387 @returns True if atomic group settings are okay, False otherwise.
388 """
389 return (self._get_host_atomic_group_id(host_labels) ==
390 queue_entry.atomic_group_id)
391
392
393 def _get_host_atomic_group_id(self, host_labels):
394 """
395 Return the atomic group label id for a host with the given set of
396 labels if any, or None otherwise. Raises an exception if more than
397 one atomic group are found in the set of labels.
398
399 @param host_labels - A list of label ids that the host has.
400
401 @returns The id of the atomic group found on a label in host_labels
402 or None if no atomic group label is found.
403 @raises SchedulerError - If more than one atomic group label is found.
404 """
405 atomic_ids = [self._labels[label_id].atomic_group_id
406 for label_id in host_labels
407 if self._labels[label_id].atomic_group_id is not None]
408 if not atomic_ids:
409 return None
410 if len(atomic_ids) > 1:
411 raise SchedulerError('More than one atomic label on host.')
412 return atomic_ids[0]
413
414
415 def _get_atomic_group_labels(self, atomic_group_id):
416 """
417 Lookup the label ids that an atomic_group is associated with.
418
419 @param atomic_group_id - The id of the AtomicGroup to look up.
420
421 @returns A generator yeilding Label ids for this atomic group.
422 """
423 return (id for id, label in self._labels.iteritems()
424 if label.atomic_group_id == atomic_group_id
425 and not label.invalid)
426
427
428 def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
429 """
430 @param group_hosts - A sequence of Host ids to test for usability
431 and eligibility against the Job associated with queue_entry.
432 @param queue_entry - The HostQueueEntry that these hosts are being
433 tested for eligibility against.
434
435 @returns A subset of group_hosts Host ids that are eligible for the
436 supplied queue_entry.
437 """
438 return set(host_id for host_id in group_hosts
439 if self._is_host_usable(host_id)
440 and self._is_host_eligible_for_job(host_id, queue_entry))
441
442
showard989f25d2008-10-01 11:38:11 +0000443 def _is_host_eligible_for_job(self, host_id, queue_entry):
444 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
445 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000446
showard89f84db2009-03-12 20:39:13 +0000447 return (self._is_acl_accessible(host_id, queue_entry) and
448 self._check_job_dependencies(job_dependencies, host_labels) and
449 self._check_only_if_needed_labels(
450 job_dependencies, host_labels, queue_entry) and
451 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000452
453
showard63a34772008-08-18 19:32:50 +0000454 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000455 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000456 return None
457 return self._hosts_available.pop(queue_entry.host_id, None)
458
459
460 def _is_host_usable(self, host_id):
461 if host_id not in self._hosts_available:
462 # host was already used during this scheduling cycle
463 return False
464 if self._hosts_available[host_id].invalid:
465 # Invalid hosts cannot be used for metahosts. They're included in
466 # the original query because they can be used by non-metahosts.
467 return False
468 return True
469
470
471 def _schedule_metahost(self, queue_entry):
472 label_id = queue_entry.meta_host
473 hosts_in_label = self._label_hosts.get(label_id, set())
474 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
475 set())
476
477 # must iterate over a copy so we can mutate the original while iterating
478 for host_id in list(hosts_in_label):
479 if not self._is_host_usable(host_id):
480 hosts_in_label.remove(host_id)
481 continue
482 if host_id in ineligible_host_ids:
483 continue
showard989f25d2008-10-01 11:38:11 +0000484 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000485 continue
486
showard89f84db2009-03-12 20:39:13 +0000487 # Remove the host from our cached internal state before returning
488 # the host object.
showard63a34772008-08-18 19:32:50 +0000489 hosts_in_label.remove(host_id)
490 return self._hosts_available.pop(host_id)
491 return None
492
493
494 def find_eligible_host(self, queue_entry):
495 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000496 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000497 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000498 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000499 return self._schedule_metahost(queue_entry)
500
501
showard89f84db2009-03-12 20:39:13 +0000502 def find_eligible_atomic_group(self, queue_entry):
503 """
504 Given an atomic group host queue entry, locate an appropriate group
505 of hosts for the associated job to run on.
506
507 The caller is responsible for creating new HQEs for the additional
508 hosts returned in order to run the actual job on them.
509
510 @returns A list of Host instances in a ready state to satisfy this
511 atomic group scheduling. Hosts will all belong to the same
512 atomic group label as specified by the queue_entry.
513 An empty list will be returned if no suitable atomic
514 group could be found.
515
516 TODO(gps): what is responsible for kicking off any attempted repairs on
517 a group of hosts? not this function, but something needs to. We do
518 not communicate that reason for returning [] outside of here...
519 For now, we'll just be unschedulable if enough hosts within one group
520 enter Repair Failed state.
521 """
522 assert queue_entry.atomic_group_id is not None
523 job = queue_entry.job
524 assert job.synch_count and job.synch_count > 0
525 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
526 if job.synch_count > atomic_group.max_number_of_machines:
527 # Such a Job and HostQueueEntry should never be possible to
528 # create using the frontend. Regardless, we can't process it.
529 # Abort it immediately and log an error on the scheduler.
530 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000531 logging.error(
532 'Error: job %d synch_count=%d > requested atomic_group %d '
533 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
534 job.id, job.synch_count, atomic_group.id,
535 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000536 return []
537 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
538 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
539 set())
540
541 # Look in each label associated with atomic_group until we find one with
542 # enough hosts to satisfy the job.
543 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
544 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
545 if queue_entry.meta_host is not None:
546 # If we have a metahost label, only allow its hosts.
547 group_hosts.intersection_update(hosts_in_label)
548 group_hosts -= ineligible_host_ids
549 eligible_hosts_in_group = self._get_eligible_hosts_in_group(
550 group_hosts, queue_entry)
551
552 # Job.synch_count is treated as "minimum synch count" when
553 # scheduling for an atomic group of hosts. The atomic group
554 # number of machines is the maximum to pick out of a single
555 # atomic group label for scheduling at one time.
556 min_hosts = job.synch_count
557 max_hosts = atomic_group.max_number_of_machines
558
559 if len(eligible_hosts_in_group) < min_hosts:
560 # Not enough eligible hosts in this atomic group label.
561 continue
562
563 # Limit ourselves to scheduling the atomic group size.
564 if len(eligible_hosts_in_group) > max_hosts:
565 eligible_hosts_in_group = random.sample(
566 eligible_hosts_in_group, max_hosts)
567
568 # Remove the selected hosts from our cached internal state
569 # of available hosts in order to return the Host objects.
570 host_list = []
571 for host_id in eligible_hosts_in_group:
572 hosts_in_label.discard(host_id)
573 host_list.append(self._hosts_available.pop(host_id))
574 return host_list
575
576 return []
577
578
showard170873e2009-01-07 00:22:26 +0000579class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000580 def __init__(self):
581 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000582 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000583 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000584 user_cleanup_time = scheduler_config.config.clean_interval
585 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
586 _db, user_cleanup_time)
587 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000588 self._host_agents = {}
589 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000590
mbligh36768f02008-02-22 18:28:33 +0000591
jadmanski0afbb632008-06-06 21:10:57 +0000592 def do_initial_recovery(self, recover_hosts=True):
593 # always recover processes
594 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000595
jadmanski0afbb632008-06-06 21:10:57 +0000596 if recover_hosts:
597 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000598
599
jadmanski0afbb632008-06-06 21:10:57 +0000600 def tick(self):
showard170873e2009-01-07 00:22:26 +0000601 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000602 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000603 self._find_aborting()
604 self._schedule_new_jobs()
605 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000606 _drone_manager.execute_actions()
607 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000608
showard97aed502008-11-04 02:01:24 +0000609
mblighf3294cc2009-04-08 21:17:38 +0000610 def _run_cleanup(self):
611 self._periodic_cleanup.run_cleanup_maybe()
612 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000613
mbligh36768f02008-02-22 18:28:33 +0000614
showard170873e2009-01-07 00:22:26 +0000615 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
616 for object_id in object_ids:
617 agent_dict.setdefault(object_id, set()).add(agent)
618
619
620 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
621 for object_id in object_ids:
622 assert object_id in agent_dict
623 agent_dict[object_id].remove(agent)
624
625
jadmanski0afbb632008-06-06 21:10:57 +0000626 def add_agent(self, agent):
627 self._agents.append(agent)
628 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000629 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
630 self._register_agent_for_ids(self._queue_entry_agents,
631 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000632
showard170873e2009-01-07 00:22:26 +0000633
634 def get_agents_for_entry(self, queue_entry):
635 """
636 Find agents corresponding to the specified queue_entry.
637 """
638 return self._queue_entry_agents.get(queue_entry.id, set())
639
640
641 def host_has_agent(self, host):
642 """
643 Determine if there is currently an Agent present using this host.
644 """
645 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000646
647
jadmanski0afbb632008-06-06 21:10:57 +0000648 def remove_agent(self, agent):
649 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000650 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
651 agent)
652 self._unregister_agent_for_ids(self._queue_entry_agents,
653 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000654
655
showard4c5374f2008-09-04 17:02:56 +0000656 def num_running_processes(self):
657 return sum(agent.num_processes for agent in self._agents
658 if agent.is_running())
mblighbb421852008-03-11 22:36:16 +0000659
660
showard170873e2009-01-07 00:22:26 +0000661 def _extract_execution_tag(self, command_line):
662 match = re.match(r'.* -P (\S+) ', command_line)
663 if not match:
664 return None
665 return match.group(1)
mblighbb421852008-03-11 22:36:16 +0000666
667
showard2bab8f42008-11-12 18:15:22 +0000668 def _recover_queue_entries(self, queue_entries, run_monitor):
669 assert len(queue_entries) > 0
showard2bab8f42008-11-12 18:15:22 +0000670 queue_task = RecoveryQueueTask(job=queue_entries[0].job,
671 queue_entries=queue_entries,
672 run_monitor=run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +0000673 self.add_agent(Agent(tasks=[queue_task],
showard170873e2009-01-07 00:22:26 +0000674 num_processes=len(queue_entries)))
mblighbb421852008-03-11 22:36:16 +0000675
676
jadmanski0afbb632008-06-06 21:10:57 +0000677 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000678 self._register_pidfiles()
679 _drone_manager.refresh()
680 self._recover_running_entries()
681 self._recover_aborting_entries()
682 self._requeue_other_active_entries()
683 self._recover_parsing_entries()
684 self._reverify_remaining_hosts()
685 # reinitialize drones after killing orphaned processes, since they can
686 # leave around files when they die
687 _drone_manager.execute_actions()
688 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000689
showard170873e2009-01-07 00:22:26 +0000690
691 def _register_pidfiles(self):
692 # during recovery we may need to read pidfiles for both running and
693 # parsing entries
694 queue_entries = HostQueueEntry.fetch(
695 where="status IN ('Running', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000696 for queue_entry in queue_entries:
showard170873e2009-01-07 00:22:26 +0000697 pidfile_id = _drone_manager.get_pidfile_id_from(
698 queue_entry.execution_tag())
699 _drone_manager.register_pidfile(pidfile_id)
700
701
702 def _recover_running_entries(self):
703 orphans = _drone_manager.get_orphaned_autoserv_processes()
704
705 queue_entries = HostQueueEntry.fetch(where="status = 'Running'")
706 requeue_entries = []
707 for queue_entry in queue_entries:
708 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000709 # synchronous job we've already recovered
710 continue
showard170873e2009-01-07 00:22:26 +0000711 execution_tag = queue_entry.execution_tag()
712 run_monitor = PidfileRunMonitor()
713 run_monitor.attach_to_existing_process(execution_tag)
714 if not run_monitor.has_process():
715 # autoserv apparently never got run, so let it get requeued
716 continue
showarde788ea62008-11-17 21:02:47 +0000717 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showardb18134f2009-03-20 20:52:18 +0000718 logging.info('Recovering %s (process %s)',
719 (', '.join(str(entry) for entry in queue_entries),
720 run_monitor.get_process()))
showard2bab8f42008-11-12 18:15:22 +0000721 self._recover_queue_entries(queue_entries, run_monitor)
showard170873e2009-01-07 00:22:26 +0000722 orphans.pop(execution_tag, None)
mbligh90a549d2008-03-25 23:52:34 +0000723
jadmanski0afbb632008-06-06 21:10:57 +0000724 # now kill any remaining autoserv processes
showard170873e2009-01-07 00:22:26 +0000725 for process in orphans.itervalues():
showardb18134f2009-03-20 20:52:18 +0000726 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000727 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000728
showard170873e2009-01-07 00:22:26 +0000729
730 def _recover_aborting_entries(self):
731 queue_entries = HostQueueEntry.fetch(
732 where='status IN ("Abort", "Aborting")')
jadmanski0afbb632008-06-06 21:10:57 +0000733 for queue_entry in queue_entries:
showardb18134f2009-03-20 20:52:18 +0000734 logging.info('Recovering aborting QE %s', queue_entry)
showard170873e2009-01-07 00:22:26 +0000735 agent = queue_entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000736
showard97aed502008-11-04 02:01:24 +0000737
showard170873e2009-01-07 00:22:26 +0000738 def _requeue_other_active_entries(self):
739 queue_entries = HostQueueEntry.fetch(
740 where='active AND NOT complete AND status != "Pending"')
741 for queue_entry in queue_entries:
742 if self.get_agents_for_entry(queue_entry):
743 # entry has already been recovered
744 continue
showardb18134f2009-03-20 20:52:18 +0000745 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
746 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000747 if queue_entry.host:
748 tasks = queue_entry.host.reverify_tasks()
749 self.add_agent(Agent(tasks))
750 agent = queue_entry.requeue()
751
752
753 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000754 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000755 self._reverify_hosts_where("""(status = 'Repairing' OR
756 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000757 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000758
showard170873e2009-01-07 00:22:26 +0000759 # recover "Running" hosts with no active queue entries, although this
760 # should never happen
761 message = ('Recovering running host %s - this probably indicates a '
762 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000763 self._reverify_hosts_where("""status = 'Running' AND
764 id NOT IN (SELECT host_id
765 FROM host_queue_entries
766 WHERE active)""",
767 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000768
769
jadmanski0afbb632008-06-06 21:10:57 +0000770 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000771 print_message='Reverifying host %s'):
772 full_where='locked = 0 AND invalid = 0 AND ' + where
773 for host in Host.fetch(where=full_where):
774 if self.host_has_agent(host):
775 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000776 continue
showard170873e2009-01-07 00:22:26 +0000777 if print_message:
showardb18134f2009-03-20 20:52:18 +0000778 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000779 tasks = host.reverify_tasks()
780 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000781
782
showard97aed502008-11-04 02:01:24 +0000783 def _recover_parsing_entries(self):
showard2bab8f42008-11-12 18:15:22 +0000784 recovered_entry_ids = set()
showard97aed502008-11-04 02:01:24 +0000785 for entry in HostQueueEntry.fetch(where='status = "Parsing"'):
showard2bab8f42008-11-12 18:15:22 +0000786 if entry.id in recovered_entry_ids:
787 continue
788 queue_entries = entry.job.get_group_entries(entry)
showard170873e2009-01-07 00:22:26 +0000789 recovered_entry_ids = recovered_entry_ids.union(
790 entry.id for entry in queue_entries)
showardb18134f2009-03-20 20:52:18 +0000791 logging.info('Recovering parsing entries %s',
792 (', '.join(str(entry) for entry in queue_entries)))
showard97aed502008-11-04 02:01:24 +0000793
794 reparse_task = FinalReparseTask(queue_entries)
showard170873e2009-01-07 00:22:26 +0000795 self.add_agent(Agent([reparse_task], num_processes=0))
showard97aed502008-11-04 02:01:24 +0000796
797
jadmanski0afbb632008-06-06 21:10:57 +0000798 def _recover_hosts(self):
799 # recover "Repair Failed" hosts
800 message = 'Reverifying dead host %s'
801 self._reverify_hosts_where("status = 'Repair Failed'",
802 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000803
804
showard04c82c52008-05-29 19:38:12 +0000805
showardb95b1bd2008-08-15 18:11:04 +0000806 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000807 # prioritize by job priority, then non-metahost over metahost, then FIFO
808 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000809 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000810 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000811 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000812
813
showard89f84db2009-03-12 20:39:13 +0000814 def _refresh_pending_queue_entries(self):
815 """
816 Lookup the pending HostQueueEntries and call our HostScheduler
817 refresh() method given that list. Return the list.
818
819 @returns A list of pending HostQueueEntries sorted in priority order.
820 """
showard63a34772008-08-18 19:32:50 +0000821 queue_entries = self._get_pending_queue_entries()
822 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000823 return []
showardb95b1bd2008-08-15 18:11:04 +0000824
showard63a34772008-08-18 19:32:50 +0000825 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000826
showard89f84db2009-03-12 20:39:13 +0000827 return queue_entries
828
829
830 def _schedule_atomic_group(self, queue_entry):
831 """
832 Schedule the given queue_entry on an atomic group of hosts.
833
834 Returns immediately if there are insufficient available hosts.
835
836 Creates new HostQueueEntries based off of queue_entry for the
837 scheduled hosts and starts them all running.
838 """
839 # This is a virtual host queue entry representing an entire
840 # atomic group, find a group and schedule their hosts.
841 group_hosts = self._host_scheduler.find_eligible_atomic_group(
842 queue_entry)
843 if not group_hosts:
844 return
845 # The first assigned host uses the original HostQueueEntry
846 group_queue_entries = [queue_entry]
847 for assigned_host in group_hosts[1:]:
848 # Create a new HQE for every additional assigned_host.
849 new_hqe = HostQueueEntry.clone(queue_entry)
850 new_hqe.save()
851 group_queue_entries.append(new_hqe)
852 assert len(group_queue_entries) == len(group_hosts)
853 for queue_entry, host in itertools.izip(group_queue_entries,
854 group_hosts):
855 self._run_queue_entry(queue_entry, host)
856
857
858 def _schedule_new_jobs(self):
859 queue_entries = self._refresh_pending_queue_entries()
860 if not queue_entries:
861 return
862
showard63a34772008-08-18 19:32:50 +0000863 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000864 if (queue_entry.atomic_group_id is None or
865 queue_entry.host_id is not None):
866 assigned_host = self._host_scheduler.find_eligible_host(
867 queue_entry)
868 if assigned_host:
869 self._run_queue_entry(queue_entry, assigned_host)
870 else:
871 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000872
873
874 def _run_queue_entry(self, queue_entry, host):
875 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000876 # in some cases (synchronous jobs with run_verify=False), agent may be
877 # None
showard9976ce92008-10-15 20:28:13 +0000878 if agent:
879 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000880
881
jadmanski0afbb632008-06-06 21:10:57 +0000882 def _find_aborting(self):
jadmanski0afbb632008-06-06 21:10:57 +0000883 for entry in queue_entries_to_abort():
showard170873e2009-01-07 00:22:26 +0000884 agents_to_abort = list(self.get_agents_for_entry(entry))
showard1be97432008-10-17 15:30:45 +0000885 for agent in agents_to_abort:
886 self.remove_agent(agent)
887
showard170873e2009-01-07 00:22:26 +0000888 entry.abort(self, agents_to_abort)
jadmanski0afbb632008-06-06 21:10:57 +0000889
890
showard324bf812009-01-20 23:23:38 +0000891 def _can_start_agent(self, agent, num_started_this_cycle,
892 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000893 # always allow zero-process agents to run
894 if agent.num_processes == 0:
895 return True
896 # don't allow any nonzero-process agents to run after we've reached a
897 # limit (this avoids starvation of many-process agents)
898 if have_reached_limit:
899 return False
900 # total process throttling
showard324bf812009-01-20 23:23:38 +0000901 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000902 return False
903 # if a single agent exceeds the per-cycle throttling, still allow it to
904 # run when it's the first agent in the cycle
905 if num_started_this_cycle == 0:
906 return True
907 # per-cycle throttling
908 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000909 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000910 return False
911 return True
912
913
jadmanski0afbb632008-06-06 21:10:57 +0000914 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000915 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000916 have_reached_limit = False
917 # iterate over copy, so we can remove agents during iteration
918 for agent in list(self._agents):
919 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000920 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000921 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000922 continue
923 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000924 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000925 have_reached_limit):
926 have_reached_limit = True
927 continue
showard4c5374f2008-09-04 17:02:56 +0000928 num_started_this_cycle += agent.num_processes
929 agent.tick()
showardb18134f2009-03-20 20:52:18 +0000930 logging.info('%d running processes',
931 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000932
933
showard170873e2009-01-07 00:22:26 +0000934class PidfileRunMonitor(object):
935 """
936 Client must call either run() to start a new process or
937 attach_to_existing_process().
938 """
mbligh36768f02008-02-22 18:28:33 +0000939
showard170873e2009-01-07 00:22:26 +0000940 class _PidfileException(Exception):
941 """
942 Raised when there's some unexpected behavior with the pid file, but only
943 used internally (never allowed to escape this class).
944 """
mbligh36768f02008-02-22 18:28:33 +0000945
946
showard170873e2009-01-07 00:22:26 +0000947 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000948 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000949 self._start_time = None
950 self.pidfile_id = None
951 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000952
953
showard170873e2009-01-07 00:22:26 +0000954 def _add_nice_command(self, command, nice_level):
955 if not nice_level:
956 return command
957 return ['nice', '-n', str(nice_level)] + command
958
959
960 def _set_start_time(self):
961 self._start_time = time.time()
962
963
964 def run(self, command, working_directory, nice_level=None, log_file=None,
965 pidfile_name=None, paired_with_pidfile=None):
966 assert command is not None
967 if nice_level is not None:
968 command = ['nice', '-n', str(nice_level)] + command
969 self._set_start_time()
970 self.pidfile_id = _drone_manager.execute_command(
971 command, working_directory, log_file=log_file,
972 pidfile_name=pidfile_name, paired_with_pidfile=paired_with_pidfile)
973
974
975 def attach_to_existing_process(self, execution_tag):
976 self._set_start_time()
977 self.pidfile_id = _drone_manager.get_pidfile_id_from(execution_tag)
978 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +0000979
980
jadmanski0afbb632008-06-06 21:10:57 +0000981 def kill(self):
showard170873e2009-01-07 00:22:26 +0000982 if self.has_process():
983 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000984
mbligh36768f02008-02-22 18:28:33 +0000985
showard170873e2009-01-07 00:22:26 +0000986 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000987 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000988 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000989
990
showard170873e2009-01-07 00:22:26 +0000991 def get_process(self):
showard21baa452008-10-21 00:08:39 +0000992 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +0000993 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +0000994 return self._state.process
mblighbb421852008-03-11 22:36:16 +0000995
996
showard170873e2009-01-07 00:22:26 +0000997 def _read_pidfile(self, use_second_read=False):
998 assert self.pidfile_id is not None, (
999 'You must call run() or attach_to_existing_process()')
1000 contents = _drone_manager.get_pidfile_contents(
1001 self.pidfile_id, use_second_read=use_second_read)
1002 if contents.is_invalid():
1003 self._state = drone_manager.PidfileContents()
1004 raise self._PidfileException(contents)
1005 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001006
1007
showard21baa452008-10-21 00:08:39 +00001008 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001009 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1010 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001011 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001012 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001013 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001014
1015
1016 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001017 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001018 return
mblighbb421852008-03-11 22:36:16 +00001019
showard21baa452008-10-21 00:08:39 +00001020 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001021
showard170873e2009-01-07 00:22:26 +00001022 if self._state.process is None:
1023 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001024 return
mbligh90a549d2008-03-25 23:52:34 +00001025
showard21baa452008-10-21 00:08:39 +00001026 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001027 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001028 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001029 return
mbligh90a549d2008-03-25 23:52:34 +00001030
showard170873e2009-01-07 00:22:26 +00001031 # pid but no running process - maybe process *just* exited
1032 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001033 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001034 # autoserv exited without writing an exit code
1035 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001036 self._handle_pidfile_error(
1037 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001038
showard21baa452008-10-21 00:08:39 +00001039
1040 def _get_pidfile_info(self):
1041 """\
1042 After completion, self._state will contain:
1043 pid=None, exit_status=None if autoserv has not yet run
1044 pid!=None, exit_status=None if autoserv is running
1045 pid!=None, exit_status!=None if autoserv has completed
1046 """
1047 try:
1048 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001049 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001050 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001051
1052
showard170873e2009-01-07 00:22:26 +00001053 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001054 """\
1055 Called when no pidfile is found or no pid is in the pidfile.
1056 """
showard170873e2009-01-07 00:22:26 +00001057 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001058 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001059 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1060 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001061 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001062 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001063
1064
showard35162b02009-03-03 02:17:30 +00001065 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001066 """\
1067 Called when autoserv has exited without writing an exit status,
1068 or we've timed out waiting for autoserv to write a pid to the
1069 pidfile. In either case, we just return failure and the caller
1070 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001071
showard170873e2009-01-07 00:22:26 +00001072 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001073 """
1074 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001075 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001076 self._state.exit_status = 1
1077 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001078
1079
jadmanski0afbb632008-06-06 21:10:57 +00001080 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001081 self._get_pidfile_info()
1082 return self._state.exit_status
1083
1084
1085 def num_tests_failed(self):
1086 self._get_pidfile_info()
1087 assert self._state.num_tests_failed is not None
1088 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001089
1090
mbligh36768f02008-02-22 18:28:33 +00001091class Agent(object):
showard170873e2009-01-07 00:22:26 +00001092 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001093 self.active_task = None
1094 self.queue = Queue.Queue(0)
1095 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001096 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001097
showard170873e2009-01-07 00:22:26 +00001098 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1099 for task in tasks)
1100 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1101
jadmanski0afbb632008-06-06 21:10:57 +00001102 for task in tasks:
1103 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001104
1105
showard170873e2009-01-07 00:22:26 +00001106 def _union_ids(self, id_lists):
1107 return set(itertools.chain(*id_lists))
1108
1109
jadmanski0afbb632008-06-06 21:10:57 +00001110 def add_task(self, task):
1111 self.queue.put_nowait(task)
1112 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001113
1114
jadmanski0afbb632008-06-06 21:10:57 +00001115 def tick(self):
showard21baa452008-10-21 00:08:39 +00001116 while not self.is_done():
1117 if self.active_task and not self.active_task.is_done():
1118 self.active_task.poll()
1119 if not self.active_task.is_done():
1120 return
1121 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001122
1123
jadmanski0afbb632008-06-06 21:10:57 +00001124 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001125 logging.info("agent picking task")
jadmanski0afbb632008-06-06 21:10:57 +00001126 if self.active_task:
1127 assert self.active_task.is_done()
mbligh36768f02008-02-22 18:28:33 +00001128
jadmanski0afbb632008-06-06 21:10:57 +00001129 if not self.active_task.success:
1130 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001131
jadmanski0afbb632008-06-06 21:10:57 +00001132 self.active_task = None
1133 if not self.is_done():
1134 self.active_task = self.queue.get_nowait()
1135 if self.active_task:
1136 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001137
1138
jadmanski0afbb632008-06-06 21:10:57 +00001139 def on_task_failure(self):
1140 self.queue = Queue.Queue(0)
showardccbd6c52009-03-21 00:10:21 +00001141 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1142 # get reset.
1143 new_agent = Agent(self.active_task.failure_tasks)
1144 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001145
mblighe2586682008-02-29 22:45:46 +00001146
showard4c5374f2008-09-04 17:02:56 +00001147 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001148 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001149
1150
jadmanski0afbb632008-06-06 21:10:57 +00001151 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001152 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001153
1154
jadmanski0afbb632008-06-06 21:10:57 +00001155 def start(self):
1156 assert self.dispatcher
mbligh36768f02008-02-22 18:28:33 +00001157
jadmanski0afbb632008-06-06 21:10:57 +00001158 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001159
jadmanski0afbb632008-06-06 21:10:57 +00001160
mbligh36768f02008-02-22 18:28:33 +00001161class AgentTask(object):
showard170873e2009-01-07 00:22:26 +00001162 def __init__(self, cmd, working_directory=None, failure_tasks=[]):
jadmanski0afbb632008-06-06 21:10:57 +00001163 self.done = False
1164 self.failure_tasks = failure_tasks
1165 self.started = False
1166 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001167 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001168 self.task = None
1169 self.agent = None
1170 self.monitor = None
1171 self.success = None
showard170873e2009-01-07 00:22:26 +00001172 self.queue_entry_ids = []
1173 self.host_ids = []
1174 self.log_file = None
1175
1176
1177 def _set_ids(self, host=None, queue_entries=None):
1178 if queue_entries and queue_entries != [None]:
1179 self.host_ids = [entry.host.id for entry in queue_entries]
1180 self.queue_entry_ids = [entry.id for entry in queue_entries]
1181 else:
1182 assert host
1183 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001184
1185
jadmanski0afbb632008-06-06 21:10:57 +00001186 def poll(self):
jadmanski0afbb632008-06-06 21:10:57 +00001187 if self.monitor:
1188 self.tick(self.monitor.exit_code())
1189 else:
1190 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001191
1192
jadmanski0afbb632008-06-06 21:10:57 +00001193 def tick(self, exit_code):
showard170873e2009-01-07 00:22:26 +00001194 if exit_code is None:
jadmanski0afbb632008-06-06 21:10:57 +00001195 return
jadmanski0afbb632008-06-06 21:10:57 +00001196 if exit_code == 0:
1197 success = True
1198 else:
1199 success = False
mbligh36768f02008-02-22 18:28:33 +00001200
jadmanski0afbb632008-06-06 21:10:57 +00001201 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001202
1203
jadmanski0afbb632008-06-06 21:10:57 +00001204 def is_done(self):
1205 return self.done
mbligh36768f02008-02-22 18:28:33 +00001206
1207
jadmanski0afbb632008-06-06 21:10:57 +00001208 def finished(self, success):
1209 self.done = True
1210 self.success = success
1211 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001212
1213
jadmanski0afbb632008-06-06 21:10:57 +00001214 def prolog(self):
1215 pass
mblighd64e5702008-04-04 21:39:28 +00001216
1217
jadmanski0afbb632008-06-06 21:10:57 +00001218 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001219 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001220
mbligh36768f02008-02-22 18:28:33 +00001221
jadmanski0afbb632008-06-06 21:10:57 +00001222 def cleanup(self):
showard170873e2009-01-07 00:22:26 +00001223 if self.monitor and self.log_file:
1224 _drone_manager.copy_to_results_repository(
1225 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001226
1227
jadmanski0afbb632008-06-06 21:10:57 +00001228 def epilog(self):
1229 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001230
1231
jadmanski0afbb632008-06-06 21:10:57 +00001232 def start(self):
1233 assert self.agent
1234
1235 if not self.started:
1236 self.prolog()
1237 self.run()
1238
1239 self.started = True
1240
1241
1242 def abort(self):
1243 if self.monitor:
1244 self.monitor.kill()
1245 self.done = True
1246 self.cleanup()
1247
1248
showard170873e2009-01-07 00:22:26 +00001249 def set_host_log_file(self, base_name, host):
1250 filename = '%s.%s' % (time.time(), base_name)
1251 self.log_file = os.path.join('hosts', host.hostname, filename)
1252
1253
showardde634ee2009-01-30 01:44:24 +00001254 def _get_consistent_execution_tag(self, queue_entries):
1255 first_execution_tag = queue_entries[0].execution_tag()
1256 for queue_entry in queue_entries[1:]:
1257 assert queue_entry.execution_tag() == first_execution_tag, (
1258 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1259 queue_entry,
1260 first_execution_tag,
1261 queue_entries[0]))
1262 return first_execution_tag
1263
1264
showard678df4f2009-02-04 21:36:39 +00001265 def _copy_and_parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001266 assert len(queue_entries) > 0
1267 assert self.monitor
1268 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001269 results_path = execution_tag + '/'
1270 _drone_manager.copy_to_results_repository(self.monitor.get_process(),
1271 results_path)
showardde634ee2009-01-30 01:44:24 +00001272
1273 reparse_task = FinalReparseTask(queue_entries)
1274 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1275
1276
jadmanski0afbb632008-06-06 21:10:57 +00001277 def run(self):
1278 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001279 self.monitor = PidfileRunMonitor()
1280 self.monitor.run(self.cmd, self._working_directory,
1281 nice_level=AUTOSERV_NICE_LEVEL,
1282 log_file=self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001283
1284
1285class RepairTask(AgentTask):
showarde788ea62008-11-17 21:02:47 +00001286 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001287 """\
showard170873e2009-01-07 00:22:26 +00001288 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001289 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001290 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001291 # normalize the protection name
1292 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001293
jadmanski0afbb632008-06-06 21:10:57 +00001294 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001295 self.queue_entry_to_fail = queue_entry
1296 # *don't* include the queue entry in IDs -- if the queue entry is
1297 # aborted, we want to leave the repair task running
1298 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001299
1300 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001301 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1302 ['-R', '--host-protection', protection],
1303 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001304 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1305
showard170873e2009-01-07 00:22:26 +00001306 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001307
mbligh36768f02008-02-22 18:28:33 +00001308
jadmanski0afbb632008-06-06 21:10:57 +00001309 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001310 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001311 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001312 if self.queue_entry_to_fail:
1313 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001314
1315
showardde634ee2009-01-30 01:44:24 +00001316 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001317 assert self.queue_entry_to_fail
1318
1319 if self.queue_entry_to_fail.meta_host:
1320 return # don't fail metahost entries, they'll be reassigned
1321
1322 self.queue_entry_to_fail.update_from_database()
1323 if self.queue_entry_to_fail.status != 'Queued':
1324 return # entry has been aborted
1325
1326 self.queue_entry_to_fail.set_execution_subdir()
showard678df4f2009-02-04 21:36:39 +00001327 # copy results logs into the normal place for job results
1328 _drone_manager.copy_results_on_drone(
1329 self.monitor.get_process(),
1330 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001331 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001332
showardccbd6c52009-03-21 00:10:21 +00001333 self._copy_and_parse_results([self.queue_entry_to_fail])
1334 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001335
1336
jadmanski0afbb632008-06-06 21:10:57 +00001337 def epilog(self):
1338 super(RepairTask, self).epilog()
1339 if self.success:
1340 self.host.set_status('Ready')
1341 else:
1342 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001343 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001344 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001345
1346
showard8fe93b52008-11-18 17:53:22 +00001347class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001348 def epilog(self):
1349 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001350 should_copy_results = (self.queue_entry and not self.success
1351 and not self.queue_entry.meta_host)
1352 if should_copy_results:
1353 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001354 destination = os.path.join(self.queue_entry.execution_tag(),
1355 os.path.basename(self.log_file))
1356 _drone_manager.copy_to_results_repository(
1357 self.monitor.get_process(), self.log_file,
1358 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001359
1360
1361class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001362 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001363 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001364 self.host = host or queue_entry.host
1365 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001366
jadmanski0afbb632008-06-06 21:10:57 +00001367 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001368 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1369 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001370 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001371 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1372 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001373
showard170873e2009-01-07 00:22:26 +00001374 self.set_host_log_file('verify', self.host)
1375 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001376
1377
jadmanski0afbb632008-06-06 21:10:57 +00001378 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001379 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001380 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001381 if self.queue_entry:
1382 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001383 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001384
1385
jadmanski0afbb632008-06-06 21:10:57 +00001386 def epilog(self):
1387 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001388
jadmanski0afbb632008-06-06 21:10:57 +00001389 if self.success:
1390 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001391
1392
mbligh36768f02008-02-22 18:28:33 +00001393class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001394 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001395 self.job = job
1396 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001397 super(QueueTask, self).__init__(cmd, self._execution_tag())
1398 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001399
1400
showard170873e2009-01-07 00:22:26 +00001401 def _format_keyval(self, key, value):
1402 return '%s=%s' % (key, value)
mbligh36768f02008-02-22 18:28:33 +00001403
1404
showard73ec0442009-02-07 02:05:20 +00001405 def _keyval_path(self):
1406 return os.path.join(self._execution_tag(), 'keyval')
1407
1408
1409 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1410 keyval_contents = '\n'.join(self._format_keyval(key, value)
1411 for key, value in keyval_dict.iteritems())
1412 # always end with a newline to allow additional keyvals to be written
1413 keyval_contents += '\n'
1414 _drone_manager.attach_file_to_execution(self._execution_tag(),
1415 keyval_contents,
1416 file_path=keyval_path)
1417
1418
1419 def _write_keyvals_before_job(self, keyval_dict):
1420 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1421
1422
1423 def _write_keyval_after_job(self, field, value):
showard170873e2009-01-07 00:22:26 +00001424 assert self.monitor and self.monitor.has_process()
showard170873e2009-01-07 00:22:26 +00001425 _drone_manager.write_lines_to_file(
showard73ec0442009-02-07 02:05:20 +00001426 self._keyval_path(), [self._format_keyval(field, value)],
showard35162b02009-03-03 02:17:30 +00001427 paired_with_process=self.monitor.get_process())
showardd8e548a2008-09-09 03:04:57 +00001428
1429
showard170873e2009-01-07 00:22:26 +00001430 def _write_host_keyvals(self, host):
1431 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1432 host.hostname)
1433 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001434 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1435 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001436
1437
showard170873e2009-01-07 00:22:26 +00001438 def _execution_tag(self):
1439 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001440
1441
jadmanski0afbb632008-06-06 21:10:57 +00001442 def prolog(self):
showard73ec0442009-02-07 02:05:20 +00001443 queued = int(time.mktime(self.job.created_on.timetuple()))
1444 self._write_keyvals_before_job({'job_queued': queued})
jadmanski0afbb632008-06-06 21:10:57 +00001445 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001446 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001447 queue_entry.set_status('Running')
1448 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001449 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001450 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001451 assert len(self.queue_entries) == 1
1452 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001453
1454
showard35162b02009-03-03 02:17:30 +00001455 def _write_lost_process_error_file(self):
1456 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1457 _drone_manager.write_lines_to_file(error_file_path,
1458 [_LOST_PROCESS_ERROR])
1459
1460
showard97aed502008-11-04 02:01:24 +00001461 def _finish_task(self, success):
showard35162b02009-03-03 02:17:30 +00001462 if self.monitor.has_process():
1463 self._write_keyval_after_job("job_finished", int(time.time()))
1464 self._copy_and_parse_results(self.queue_entries)
1465
1466 if self.monitor.lost_process:
1467 self._write_lost_process_error_file()
1468 for queue_entry in self.queue_entries:
1469 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001470
1471
showardcbd74612008-11-19 21:42:02 +00001472 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001473 _drone_manager.write_lines_to_file(
1474 os.path.join(self._execution_tag(), 'status.log'),
1475 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001476 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001477
1478
jadmanskif7fa2cc2008-10-01 14:13:23 +00001479 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001480 if not self.monitor or not self.monitor.has_process():
1481 return
1482
jadmanskif7fa2cc2008-10-01 14:13:23 +00001483 # build up sets of all the aborted_by and aborted_on values
1484 aborted_by, aborted_on = set(), set()
1485 for queue_entry in self.queue_entries:
1486 if queue_entry.aborted_by:
1487 aborted_by.add(queue_entry.aborted_by)
1488 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1489 aborted_on.add(t)
1490
1491 # extract some actual, unique aborted by value and write it out
1492 assert len(aborted_by) <= 1
1493 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001494 aborted_by_value = aborted_by.pop()
1495 aborted_on_value = max(aborted_on)
1496 else:
1497 aborted_by_value = 'autotest_system'
1498 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001499
showarda0382352009-02-11 23:36:43 +00001500 self._write_keyval_after_job("aborted_by", aborted_by_value)
1501 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001502
showardcbd74612008-11-19 21:42:02 +00001503 aborted_on_string = str(datetime.datetime.fromtimestamp(
1504 aborted_on_value))
1505 self._write_status_comment('Job aborted by %s on %s' %
1506 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001507
1508
jadmanski0afbb632008-06-06 21:10:57 +00001509 def abort(self):
1510 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001511 self._log_abort()
showard97aed502008-11-04 02:01:24 +00001512 self._finish_task(False)
jadmanskic2ac77f2008-05-16 21:44:04 +00001513
1514
showard21baa452008-10-21 00:08:39 +00001515 def _reboot_hosts(self):
1516 reboot_after = self.job.reboot_after
1517 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00001518 if reboot_after == models.RebootAfter.ALWAYS:
showard21baa452008-10-21 00:08:39 +00001519 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00001520 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
showard21baa452008-10-21 00:08:39 +00001521 num_tests_failed = self.monitor.num_tests_failed()
1522 do_reboot = (self.success and num_tests_failed == 0)
1523
showard8ebca792008-11-04 21:54:22 +00001524 for queue_entry in self.queue_entries:
1525 if do_reboot:
showard45ae8192008-11-05 19:32:53 +00001526 # don't pass the queue entry to the CleanupTask. if the cleanup
showardfa8629c2008-11-04 16:51:23 +00001527 # fails, the job doesn't care -- it's over.
showard45ae8192008-11-05 19:32:53 +00001528 cleanup_task = CleanupTask(host=queue_entry.get_host())
1529 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
showard8ebca792008-11-04 21:54:22 +00001530 else:
1531 queue_entry.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001532
1533
jadmanski0afbb632008-06-06 21:10:57 +00001534 def epilog(self):
1535 super(QueueTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001536 self._finish_task(self.success)
showard21baa452008-10-21 00:08:39 +00001537 self._reboot_hosts()
mblighbb421852008-03-11 22:36:16 +00001538
showardb18134f2009-03-20 20:52:18 +00001539 logging.info("queue_task finished with succes=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001540
1541
mblighbb421852008-03-11 22:36:16 +00001542class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001543 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001544 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001545 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001546
1547
jadmanski0afbb632008-06-06 21:10:57 +00001548 def run(self):
1549 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001550
1551
jadmanski0afbb632008-06-06 21:10:57 +00001552 def prolog(self):
1553 # recovering an existing process - don't do prolog
1554 pass
mblighbb421852008-03-11 22:36:16 +00001555
1556
showard8fe93b52008-11-18 17:53:22 +00001557class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001558 def __init__(self, host=None, queue_entry=None):
1559 assert bool(host) ^ bool(queue_entry)
1560 if queue_entry:
1561 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001562 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001563 self.host = host
showard170873e2009-01-07 00:22:26 +00001564
1565 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001566 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1567 ['--cleanup'],
1568 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001569 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001570 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1571 failure_tasks=[repair_task])
1572
1573 self._set_ids(host=host, queue_entries=[queue_entry])
1574 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001575
mblighd5c95802008-03-05 00:33:46 +00001576
jadmanski0afbb632008-06-06 21:10:57 +00001577 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001578 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001579 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001580 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001581
mblighd5c95802008-03-05 00:33:46 +00001582
showard21baa452008-10-21 00:08:39 +00001583 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001584 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001585 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001586 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001587 self.host.update_field('dirty', 0)
1588
1589
mblighd5c95802008-03-05 00:33:46 +00001590class AbortTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001591 def __init__(self, queue_entry, agents_to_abort):
jadmanski0afbb632008-06-06 21:10:57 +00001592 super(AbortTask, self).__init__('')
showard170873e2009-01-07 00:22:26 +00001593 self.queue_entry = queue_entry
1594 # don't use _set_ids, since we don't want to set the host_ids
1595 self.queue_entry_ids = [queue_entry.id]
1596 self.agents_to_abort = agents_to_abort
mbligh36768f02008-02-22 18:28:33 +00001597
1598
jadmanski0afbb632008-06-06 21:10:57 +00001599 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001600 logging.info("starting abort on host %s, job %s",
1601 self.queue_entry.host_id, self.queue_entry.job_id)
mbligh36768f02008-02-22 18:28:33 +00001602
mblighd64e5702008-04-04 21:39:28 +00001603
jadmanski0afbb632008-06-06 21:10:57 +00001604 def epilog(self):
1605 super(AbortTask, self).epilog()
1606 self.queue_entry.set_status('Aborted')
1607 self.success = True
1608
1609
1610 def run(self):
1611 for agent in self.agents_to_abort:
1612 if (agent.active_task):
1613 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001614
1615
showard97aed502008-11-04 02:01:24 +00001616class FinalReparseTask(AgentTask):
showard97aed502008-11-04 02:01:24 +00001617 _num_running_parses = 0
1618
1619 def __init__(self, queue_entries):
1620 self._queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001621 # don't use _set_ids, since we don't want to set the host_ids
1622 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001623 self._parse_started = False
1624
1625 assert len(queue_entries) > 0
1626 queue_entry = queue_entries[0]
showard97aed502008-11-04 02:01:24 +00001627
showard170873e2009-01-07 00:22:26 +00001628 self._execution_tag = queue_entry.execution_tag()
1629 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1630 self._autoserv_monitor = PidfileRunMonitor()
1631 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1632 self._final_status = self._determine_final_status()
1633
showard97aed502008-11-04 02:01:24 +00001634 if _testing_mode:
1635 self.cmd = 'true'
showard170873e2009-01-07 00:22:26 +00001636 else:
1637 super(FinalReparseTask, self).__init__(
1638 cmd=self._generate_parse_command(),
1639 working_directory=self._execution_tag)
showard97aed502008-11-04 02:01:24 +00001640
showard170873e2009-01-07 00:22:26 +00001641 self.log_file = os.path.join(self._execution_tag, '.parse.log')
showard97aed502008-11-04 02:01:24 +00001642
1643
1644 @classmethod
1645 def _increment_running_parses(cls):
1646 cls._num_running_parses += 1
1647
1648
1649 @classmethod
1650 def _decrement_running_parses(cls):
1651 cls._num_running_parses -= 1
1652
1653
1654 @classmethod
1655 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001656 return (cls._num_running_parses <
1657 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001658
1659
showard170873e2009-01-07 00:22:26 +00001660 def _determine_final_status(self):
1661 # we'll use a PidfileRunMonitor to read the autoserv exit status
1662 if self._autoserv_monitor.exit_code() == 0:
1663 return models.HostQueueEntry.Status.COMPLETED
1664 return models.HostQueueEntry.Status.FAILED
1665
1666
showard97aed502008-11-04 02:01:24 +00001667 def prolog(self):
1668 super(FinalReparseTask, self).prolog()
1669 for queue_entry in self._queue_entries:
1670 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
1671
1672
1673 def epilog(self):
1674 super(FinalReparseTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001675 for queue_entry in self._queue_entries:
showard170873e2009-01-07 00:22:26 +00001676 queue_entry.set_status(self._final_status)
showard97aed502008-11-04 02:01:24 +00001677
1678
showard2bab8f42008-11-12 18:15:22 +00001679 def _generate_parse_command(self):
showard170873e2009-01-07 00:22:26 +00001680 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
1681 self._results_dir]
showard97aed502008-11-04 02:01:24 +00001682
1683
1684 def poll(self):
1685 # override poll to keep trying to start until the parse count goes down
1686 # and we can, at which point we revert to default behavior
1687 if self._parse_started:
1688 super(FinalReparseTask, self).poll()
1689 else:
1690 self._try_starting_parse()
1691
1692
1693 def run(self):
1694 # override run() to not actually run unless we can
1695 self._try_starting_parse()
1696
1697
1698 def _try_starting_parse(self):
1699 if not self._can_run_new_parse():
1700 return
showard170873e2009-01-07 00:22:26 +00001701
showard678df4f2009-02-04 21:36:39 +00001702 # make sure we actually have results to parse
showard35162b02009-03-03 02:17:30 +00001703 # this should never happen in normal operation
showard678df4f2009-02-04 21:36:39 +00001704 if not self._autoserv_monitor.has_process():
1705 email_manager.manager.enqueue_notify_email(
1706 'No results to parse',
1707 'No results to parse at %s' % self._autoserv_monitor.pidfile_id)
1708 self.finished(False)
1709 return
1710
showard97aed502008-11-04 02:01:24 +00001711 # actually run the parse command
showard170873e2009-01-07 00:22:26 +00001712 self.monitor = PidfileRunMonitor()
1713 self.monitor.run(self.cmd, self._working_directory,
1714 log_file=self.log_file,
1715 pidfile_name='.parser_execute',
1716 paired_with_pidfile=self._autoserv_monitor.pidfile_id)
1717
showard97aed502008-11-04 02:01:24 +00001718 self._increment_running_parses()
1719 self._parse_started = True
1720
1721
1722 def finished(self, success):
1723 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001724 if self._parse_started:
1725 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001726
1727
showardc9ae1782009-01-30 01:42:37 +00001728class SetEntryPendingTask(AgentTask):
1729 def __init__(self, queue_entry):
1730 super(SetEntryPendingTask, self).__init__(cmd='')
1731 self._queue_entry = queue_entry
1732 self._set_ids(queue_entries=[queue_entry])
1733
1734
1735 def run(self):
1736 agent = self._queue_entry.on_pending()
1737 if agent:
1738 self.agent.dispatcher.add_agent(agent)
1739 self.finished(True)
1740
1741
showarda3c58572009-03-12 20:36:59 +00001742class DBError(Exception):
1743 """Raised by the DBObject constructor when its select fails."""
1744
1745
mbligh36768f02008-02-22 18:28:33 +00001746class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001747 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001748
1749 # Subclasses MUST override these:
1750 _table_name = ''
1751 _fields = ()
1752
showarda3c58572009-03-12 20:36:59 +00001753 # A mapping from (type, id) to the instance of the object for that
1754 # particular id. This prevents us from creating new Job() and Host()
1755 # instances for every HostQueueEntry object that we instantiate as
1756 # multiple HQEs often share the same Job.
1757 _instances_by_type_and_id = weakref.WeakValueDictionary()
1758 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001759
showarda3c58572009-03-12 20:36:59 +00001760
1761 def __new__(cls, id=None, **kwargs):
1762 """
1763 Look to see if we already have an instance for this particular type
1764 and id. If so, use it instead of creating a duplicate instance.
1765 """
1766 if id is not None:
1767 instance = cls._instances_by_type_and_id.get((cls, id))
1768 if instance:
1769 return instance
1770 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1771
1772
1773 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001774 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001775 assert self._table_name, '_table_name must be defined in your class'
1776 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00001777 if not new_record:
1778 if self._initialized and not always_query:
1779 return # We've already been initialized.
1780 if id is None:
1781 id = row[0]
1782 # Tell future constructors to use us instead of re-querying while
1783 # this instance is still around.
1784 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00001785
showard6ae5ea92009-02-25 00:11:51 +00001786 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00001787
jadmanski0afbb632008-06-06 21:10:57 +00001788 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001789
jadmanski0afbb632008-06-06 21:10:57 +00001790 if row is None:
showardccbd6c52009-03-21 00:10:21 +00001791 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00001792
showarda3c58572009-03-12 20:36:59 +00001793 if self._initialized:
1794 differences = self._compare_fields_in_row(row)
1795 if differences:
showard7629f142009-03-27 21:02:02 +00001796 logging.warn(
1797 'initialized %s %s instance requery is updating: %s',
1798 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00001799 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00001800 self._initialized = True
1801
1802
1803 @classmethod
1804 def _clear_instance_cache(cls):
1805 """Used for testing, clear the internal instance cache."""
1806 cls._instances_by_type_and_id.clear()
1807
1808
showardccbd6c52009-03-21 00:10:21 +00001809 def _fetch_row_from_db(self, row_id):
1810 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1811 rows = _db.execute(sql, (row_id,))
1812 if not rows:
showard76e29d12009-04-15 21:53:10 +00001813 raise DBError("row not found (table=%s, row id=%s)"
1814 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00001815 return rows[0]
1816
1817
showarda3c58572009-03-12 20:36:59 +00001818 def _assert_row_length(self, row):
1819 assert len(row) == len(self._fields), (
1820 "table = %s, row = %s/%d, fields = %s/%d" % (
1821 self.__table, row, len(row), self._fields, len(self._fields)))
1822
1823
1824 def _compare_fields_in_row(self, row):
1825 """
1826 Given a row as returned by a SELECT query, compare it to our existing
1827 in memory fields.
1828
1829 @param row - A sequence of values corresponding to fields named in
1830 The class attribute _fields.
1831
1832 @returns A dictionary listing the differences keyed by field name
1833 containing tuples of (current_value, row_value).
1834 """
1835 self._assert_row_length(row)
1836 differences = {}
1837 for field, row_value in itertools.izip(self._fields, row):
1838 current_value = getattr(self, field)
1839 if current_value != row_value:
1840 differences[field] = (current_value, row_value)
1841 return differences
showard2bab8f42008-11-12 18:15:22 +00001842
1843
1844 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00001845 """
1846 Update our field attributes using a single row returned by SELECT.
1847
1848 @param row - A sequence of values corresponding to fields named in
1849 the class fields list.
1850 """
1851 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00001852
showard2bab8f42008-11-12 18:15:22 +00001853 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00001854 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00001855 setattr(self, field, value)
1856 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00001857
showard2bab8f42008-11-12 18:15:22 +00001858 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00001859
mblighe2586682008-02-29 22:45:46 +00001860
showardccbd6c52009-03-21 00:10:21 +00001861 def update_from_database(self):
1862 assert self.id is not None
1863 row = self._fetch_row_from_db(self.id)
1864 self._update_fields_from_row(row)
1865
1866
jadmanski0afbb632008-06-06 21:10:57 +00001867 def count(self, where, table = None):
1868 if not table:
1869 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001870
jadmanski0afbb632008-06-06 21:10:57 +00001871 rows = _db.execute("""
1872 SELECT count(*) FROM %s
1873 WHERE %s
1874 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001875
jadmanski0afbb632008-06-06 21:10:57 +00001876 assert len(rows) == 1
1877
1878 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001879
1880
mblighf8c624d2008-07-03 16:58:45 +00001881 def update_field(self, field, value, condition=''):
showard2bab8f42008-11-12 18:15:22 +00001882 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00001883
showard2bab8f42008-11-12 18:15:22 +00001884 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00001885 return
mbligh36768f02008-02-22 18:28:33 +00001886
mblighf8c624d2008-07-03 16:58:45 +00001887 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
1888 if condition:
1889 query += ' AND (%s)' % condition
jadmanski0afbb632008-06-06 21:10:57 +00001890 _db.execute(query, (value, self.id))
1891
showard2bab8f42008-11-12 18:15:22 +00001892 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00001893
1894
jadmanski0afbb632008-06-06 21:10:57 +00001895 def save(self):
1896 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00001897 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00001898 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00001899 values = []
1900 for key in keys:
1901 value = getattr(self, key)
1902 if value is None:
1903 values.append('NULL')
1904 else:
1905 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00001906 values_str = ','.join(values)
1907 query = ('INSERT INTO %s (%s) VALUES (%s)' %
1908 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00001909 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00001910 # Update our id to the one the database just assigned to us.
1911 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00001912
1913
jadmanski0afbb632008-06-06 21:10:57 +00001914 def delete(self):
showarda3c58572009-03-12 20:36:59 +00001915 self._instances_by_type_and_id.pop((type(self), id), None)
1916 self._initialized = False
1917 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00001918 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1919 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00001920
1921
showard63a34772008-08-18 19:32:50 +00001922 @staticmethod
1923 def _prefix_with(string, prefix):
1924 if string:
1925 string = prefix + string
1926 return string
1927
1928
jadmanski0afbb632008-06-06 21:10:57 +00001929 @classmethod
showard989f25d2008-10-01 11:38:11 +00001930 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00001931 """
1932 Construct instances of our class based on the given database query.
1933
1934 @yields One class instance for each row fetched.
1935 """
showard63a34772008-08-18 19:32:50 +00001936 order_by = cls._prefix_with(order_by, 'ORDER BY ')
1937 where = cls._prefix_with(where, 'WHERE ')
1938 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00001939 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00001940 'joins' : joins,
1941 'where' : where,
1942 'order_by' : order_by})
1943 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00001944 for row in rows:
1945 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00001946
mbligh36768f02008-02-22 18:28:33 +00001947
1948class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001949 _table_name = 'ineligible_host_queues'
1950 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00001951
1952
showard89f84db2009-03-12 20:39:13 +00001953class AtomicGroup(DBObject):
1954 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00001955 _fields = ('id', 'name', 'description', 'max_number_of_machines',
1956 'invalid')
showard89f84db2009-03-12 20:39:13 +00001957
1958
showard989f25d2008-10-01 11:38:11 +00001959class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001960 _table_name = 'labels'
1961 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00001962 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00001963
1964
mbligh36768f02008-02-22 18:28:33 +00001965class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001966 _table_name = 'hosts'
1967 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
1968 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
1969
1970
jadmanski0afbb632008-06-06 21:10:57 +00001971 def current_task(self):
1972 rows = _db.execute("""
1973 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1974 """, (self.id,))
1975
1976 if len(rows) == 0:
1977 return None
1978 else:
1979 assert len(rows) == 1
1980 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00001981 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00001982
1983
jadmanski0afbb632008-06-06 21:10:57 +00001984 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00001985 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001986 if self.current_task():
1987 self.current_task().requeue()
1988
showard6ae5ea92009-02-25 00:11:51 +00001989
jadmanski0afbb632008-06-06 21:10:57 +00001990 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00001991 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00001992 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00001993
1994
showard170873e2009-01-07 00:22:26 +00001995 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00001996 """
showard170873e2009-01-07 00:22:26 +00001997 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00001998 """
1999 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002000 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002001 FROM labels
2002 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002003 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002004 ORDER BY labels.name
2005 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002006 platform = None
2007 all_labels = []
2008 for label_name, is_platform in rows:
2009 if is_platform:
2010 platform = label_name
2011 all_labels.append(label_name)
2012 return platform, all_labels
2013
2014
2015 def reverify_tasks(self):
2016 cleanup_task = CleanupTask(host=self)
2017 verify_task = VerifyTask(host=self)
2018 # just to make sure this host does not get taken away
2019 self.set_status('Cleaning')
2020 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002021
2022
mbligh36768f02008-02-22 18:28:33 +00002023class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002024 _table_name = 'host_queue_entries'
2025 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002026 'active', 'complete', 'deleted', 'execution_subdir',
2027 'atomic_group_id')
showard6ae5ea92009-02-25 00:11:51 +00002028
2029
showarda3c58572009-03-12 20:36:59 +00002030 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002031 assert id or row
showarda3c58572009-03-12 20:36:59 +00002032 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002033 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002034
jadmanski0afbb632008-06-06 21:10:57 +00002035 if self.host_id:
2036 self.host = Host(self.host_id)
2037 else:
2038 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002039
showard170873e2009-01-07 00:22:26 +00002040 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002041 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002042
2043
showard89f84db2009-03-12 20:39:13 +00002044 @classmethod
2045 def clone(cls, template):
2046 """
2047 Creates a new row using the values from a template instance.
2048
2049 The new instance will not exist in the database or have a valid
2050 id attribute until its save() method is called.
2051 """
2052 assert isinstance(template, cls)
2053 new_row = [getattr(template, field) for field in cls._fields]
2054 clone = cls(row=new_row, new_record=True)
2055 clone.id = None
2056 return clone
2057
2058
showardc85c21b2008-11-24 22:17:37 +00002059 def _view_job_url(self):
2060 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2061
2062
jadmanski0afbb632008-06-06 21:10:57 +00002063 def set_host(self, host):
2064 if host:
2065 self.queue_log_record('Assigning host ' + host.hostname)
2066 self.update_field('host_id', host.id)
2067 self.update_field('active', True)
2068 self.block_host(host.id)
2069 else:
2070 self.queue_log_record('Releasing host')
2071 self.unblock_host(self.host.id)
2072 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002073
jadmanski0afbb632008-06-06 21:10:57 +00002074 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002075
2076
jadmanski0afbb632008-06-06 21:10:57 +00002077 def get_host(self):
2078 return self.host
mbligh36768f02008-02-22 18:28:33 +00002079
2080
jadmanski0afbb632008-06-06 21:10:57 +00002081 def queue_log_record(self, log_line):
2082 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002083 _drone_manager.write_lines_to_file(self.queue_log_path,
2084 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002085
2086
jadmanski0afbb632008-06-06 21:10:57 +00002087 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002088 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002089 row = [0, self.job.id, host_id]
2090 block = IneligibleHostQueue(row=row, new_record=True)
2091 block.save()
mblighe2586682008-02-29 22:45:46 +00002092
2093
jadmanski0afbb632008-06-06 21:10:57 +00002094 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002095 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002096 blocks = IneligibleHostQueue.fetch(
2097 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2098 for block in blocks:
2099 block.delete()
mblighe2586682008-02-29 22:45:46 +00002100
2101
showard2bab8f42008-11-12 18:15:22 +00002102 def set_execution_subdir(self, subdir=None):
2103 if subdir is None:
2104 assert self.get_host()
2105 subdir = self.get_host().hostname
2106 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002107
2108
showard6355f6b2008-12-05 18:52:13 +00002109 def _get_hostname(self):
2110 if self.host:
2111 return self.host.hostname
2112 return 'no host'
2113
2114
showard170873e2009-01-07 00:22:26 +00002115 def __str__(self):
2116 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2117
2118
jadmanski0afbb632008-06-06 21:10:57 +00002119 def set_status(self, status):
mblighf8c624d2008-07-03 16:58:45 +00002120 abort_statuses = ['Abort', 'Aborting', 'Aborted']
2121 if status not in abort_statuses:
2122 condition = ' AND '.join(['status <> "%s"' % x
2123 for x in abort_statuses])
2124 else:
2125 condition = ''
2126 self.update_field('status', status, condition=condition)
2127
showardb18134f2009-03-20 20:52:18 +00002128 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002129
showardc85c21b2008-11-24 22:17:37 +00002130 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002131 self.update_field('complete', False)
2132 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002133
jadmanski0afbb632008-06-06 21:10:57 +00002134 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showarde58e3f82008-11-20 19:04:59 +00002135 'Aborting']:
jadmanski0afbb632008-06-06 21:10:57 +00002136 self.update_field('complete', False)
2137 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002138
showardc85c21b2008-11-24 22:17:37 +00002139 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002140 self.update_field('complete', True)
2141 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002142
2143 should_email_status = (status.lower() in _notify_email_statuses or
2144 'all' in _notify_email_statuses)
2145 if should_email_status:
2146 self._email_on_status(status)
2147
2148 self._email_on_job_complete()
2149
2150
2151 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002152 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002153
2154 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2155 self.job.id, self.job.name, hostname, status)
2156 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2157 self.job.id, self.job.name, hostname, status,
2158 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002159 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002160
2161
2162 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002163 if not self.job.is_finished():
2164 return
showard542e8402008-09-19 20:16:18 +00002165
showardc85c21b2008-11-24 22:17:37 +00002166 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002167 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002168 for queue_entry in hosts_queue:
2169 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002170 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002171 queue_entry.status))
2172
2173 summary_text = "\n".join(summary_text)
2174 status_counts = models.Job.objects.get_status_counts(
2175 [self.job.id])[self.job.id]
2176 status = ', '.join('%d %s' % (count, status) for status, count
2177 in status_counts.iteritems())
2178
2179 subject = 'Autotest: Job ID: %s "%s" %s' % (
2180 self.job.id, self.job.name, status)
2181 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2182 self.job.id, self.job.name, status, self._view_job_url(),
2183 summary_text)
showard170873e2009-01-07 00:22:26 +00002184 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002185
2186
showard89f84db2009-03-12 20:39:13 +00002187 def run(self, assigned_host=None):
2188 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002189 assert assigned_host
2190 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002191 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002192
showardb18134f2009-03-20 20:52:18 +00002193 logging.info("%s/%s/%s scheduled on %s, status=%s",
2194 self.job.name, self.meta_host, self.atomic_group_id,
2195 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002196
jadmanski0afbb632008-06-06 21:10:57 +00002197 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002198
showard6ae5ea92009-02-25 00:11:51 +00002199
jadmanski0afbb632008-06-06 21:10:57 +00002200 def requeue(self):
2201 self.set_status('Queued')
showardde634ee2009-01-30 01:44:24 +00002202 # verify/cleanup failure sets the execution subdir, so reset it here
2203 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002204 if self.meta_host:
2205 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002206
2207
jadmanski0afbb632008-06-06 21:10:57 +00002208 def handle_host_failure(self):
2209 """\
2210 Called when this queue entry's host has failed verification and
2211 repair.
2212 """
2213 assert not self.meta_host
2214 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002215 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002216
2217
jadmanskif7fa2cc2008-10-01 14:13:23 +00002218 @property
2219 def aborted_by(self):
2220 self._load_abort_info()
2221 return self._aborted_by
2222
2223
2224 @property
2225 def aborted_on(self):
2226 self._load_abort_info()
2227 return self._aborted_on
2228
2229
2230 def _load_abort_info(self):
2231 """ Fetch info about who aborted the job. """
2232 if hasattr(self, "_aborted_by"):
2233 return
2234 rows = _db.execute("""
2235 SELECT users.login, aborted_host_queue_entries.aborted_on
2236 FROM aborted_host_queue_entries
2237 INNER JOIN users
2238 ON users.id = aborted_host_queue_entries.aborted_by_id
2239 WHERE aborted_host_queue_entries.queue_entry_id = %s
2240 """, (self.id,))
2241 if rows:
2242 self._aborted_by, self._aborted_on = rows[0]
2243 else:
2244 self._aborted_by = self._aborted_on = None
2245
2246
showardb2e2c322008-10-14 17:33:55 +00002247 def on_pending(self):
2248 """
2249 Called when an entry in a synchronous job has passed verify. If the
2250 job is ready to run, returns an agent to run the job. Returns None
2251 otherwise.
2252 """
2253 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002254 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002255 if self.job.is_ready():
2256 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002257 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002258 return None
2259
2260
showard170873e2009-01-07 00:22:26 +00002261 def abort(self, dispatcher, agents_to_abort=[]):
showard1be97432008-10-17 15:30:45 +00002262 host = self.get_host()
showard9d9ffd52008-11-09 23:14:35 +00002263 if self.active and host:
showard170873e2009-01-07 00:22:26 +00002264 dispatcher.add_agent(Agent(tasks=host.reverify_tasks()))
showard1be97432008-10-17 15:30:45 +00002265
showard170873e2009-01-07 00:22:26 +00002266 abort_task = AbortTask(self, agents_to_abort)
showard1be97432008-10-17 15:30:45 +00002267 self.set_status('Aborting')
showard170873e2009-01-07 00:22:26 +00002268 dispatcher.add_agent(Agent(tasks=[abort_task], num_processes=0))
2269
2270 def execution_tag(self):
2271 assert self.execution_subdir
2272 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002273
2274
mbligh36768f02008-02-22 18:28:33 +00002275class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002276 _table_name = 'jobs'
2277 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2278 'control_type', 'created_on', 'synch_count', 'timeout',
2279 'run_verify', 'email_list', 'reboot_before', 'reboot_after')
2280
2281
showarda3c58572009-03-12 20:36:59 +00002282 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002283 assert id or row
showarda3c58572009-03-12 20:36:59 +00002284 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002285
mblighe2586682008-02-29 22:45:46 +00002286
jadmanski0afbb632008-06-06 21:10:57 +00002287 def is_server_job(self):
2288 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002289
2290
showard170873e2009-01-07 00:22:26 +00002291 def tag(self):
2292 return "%s-%s" % (self.id, self.owner)
2293
2294
jadmanski0afbb632008-06-06 21:10:57 +00002295 def get_host_queue_entries(self):
2296 rows = _db.execute("""
2297 SELECT * FROM host_queue_entries
2298 WHERE job_id= %s
2299 """, (self.id,))
2300 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002301
jadmanski0afbb632008-06-06 21:10:57 +00002302 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002303
jadmanski0afbb632008-06-06 21:10:57 +00002304 return entries
mbligh36768f02008-02-22 18:28:33 +00002305
2306
jadmanski0afbb632008-06-06 21:10:57 +00002307 def set_status(self, status, update_queues=False):
2308 self.update_field('status',status)
2309
2310 if update_queues:
2311 for queue_entry in self.get_host_queue_entries():
2312 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002313
2314
jadmanski0afbb632008-06-06 21:10:57 +00002315 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002316 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2317 status='Pending')
2318 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002319
2320
jadmanski0afbb632008-06-06 21:10:57 +00002321 def num_machines(self, clause = None):
2322 sql = "job_id=%s" % self.id
2323 if clause:
2324 sql += " AND (%s)" % clause
2325 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002326
2327
jadmanski0afbb632008-06-06 21:10:57 +00002328 def num_queued(self):
2329 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002330
2331
jadmanski0afbb632008-06-06 21:10:57 +00002332 def num_active(self):
2333 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002334
2335
jadmanski0afbb632008-06-06 21:10:57 +00002336 def num_complete(self):
2337 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002338
2339
jadmanski0afbb632008-06-06 21:10:57 +00002340 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002341 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002342
mbligh36768f02008-02-22 18:28:33 +00002343
showard6bb7c292009-01-30 01:44:51 +00002344 def _not_yet_run_entries(self, include_verifying=True):
2345 statuses = [models.HostQueueEntry.Status.QUEUED,
2346 models.HostQueueEntry.Status.PENDING]
2347 if include_verifying:
2348 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2349 return models.HostQueueEntry.objects.filter(job=self.id,
2350 status__in=statuses)
2351
2352
2353 def _stop_all_entries(self):
2354 entries_to_stop = self._not_yet_run_entries(
2355 include_verifying=False)
2356 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002357 assert not child_entry.complete, (
2358 '%s status=%s, active=%s, complete=%s' %
2359 (child_entry.id, child_entry.status, child_entry.active,
2360 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002361 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2362 child_entry.host.status = models.Host.Status.READY
2363 child_entry.host.save()
2364 child_entry.status = models.HostQueueEntry.Status.STOPPED
2365 child_entry.save()
2366
showard2bab8f42008-11-12 18:15:22 +00002367 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002368 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002369 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002370 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002371
2372
jadmanski0afbb632008-06-06 21:10:57 +00002373 def write_to_machines_file(self, queue_entry):
2374 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002375 file_path = os.path.join(self.tag(), '.machines')
2376 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002377
2378
showard2bab8f42008-11-12 18:15:22 +00002379 def _next_group_name(self):
2380 query = models.HostQueueEntry.objects.filter(
2381 job=self.id).values('execution_subdir').distinct()
2382 subdirs = (entry['execution_subdir'] for entry in query)
2383 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2384 ids = [int(match.group(1)) for match in groups if match]
2385 if ids:
2386 next_id = max(ids) + 1
2387 else:
2388 next_id = 0
2389 return "group%d" % next_id
2390
2391
showard170873e2009-01-07 00:22:26 +00002392 def _write_control_file(self, execution_tag):
2393 control_path = _drone_manager.attach_file_to_execution(
2394 execution_tag, self.control_file)
2395 return control_path
mbligh36768f02008-02-22 18:28:33 +00002396
showardb2e2c322008-10-14 17:33:55 +00002397
showard2bab8f42008-11-12 18:15:22 +00002398 def get_group_entries(self, queue_entry_from_group):
2399 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002400 return list(HostQueueEntry.fetch(
2401 where='job_id=%s AND execution_subdir=%s',
2402 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002403
2404
showardb2e2c322008-10-14 17:33:55 +00002405 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002406 assert queue_entries
2407 execution_tag = queue_entries[0].execution_tag()
2408 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002409 hostnames = ','.join([entry.get_host().hostname
2410 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002411
showard87ba02a2009-04-20 19:37:32 +00002412 params = _autoserv_command_line(
2413 hostnames, execution_tag,
2414 ['-P', execution_tag, '-n',
2415 _drone_manager.absolute_path(control_path)],
2416 job=self)
mbligh36768f02008-02-22 18:28:33 +00002417
jadmanski0afbb632008-06-06 21:10:57 +00002418 if not self.is_server_job():
2419 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002420
showardb2e2c322008-10-14 17:33:55 +00002421 return params
mblighe2586682008-02-29 22:45:46 +00002422
mbligh36768f02008-02-22 18:28:33 +00002423
showardc9ae1782009-01-30 01:42:37 +00002424 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002425 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002426 return True
showard0fc38302008-10-23 00:44:07 +00002427 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002428 return queue_entry.get_host().dirty
2429 return False
showard21baa452008-10-21 00:08:39 +00002430
showardc9ae1782009-01-30 01:42:37 +00002431
2432 def _should_run_verify(self, queue_entry):
2433 do_not_verify = (queue_entry.host.protection ==
2434 host_protections.Protection.DO_NOT_VERIFY)
2435 if do_not_verify:
2436 return False
2437 return self.run_verify
2438
2439
2440 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002441 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002442 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002443 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002444 if self._should_run_verify(queue_entry):
2445 tasks.append(VerifyTask(queue_entry=queue_entry))
2446 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002447 return tasks
2448
2449
showard2bab8f42008-11-12 18:15:22 +00002450 def _assign_new_group(self, queue_entries):
2451 if len(queue_entries) == 1:
2452 group_name = queue_entries[0].get_host().hostname
2453 else:
2454 group_name = self._next_group_name()
showardb18134f2009-03-20 20:52:18 +00002455 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002456 self.id, [entry.host.hostname for entry in queue_entries],
2457 group_name)
2458
2459 for queue_entry in queue_entries:
2460 queue_entry.set_execution_subdir(group_name)
2461
2462
2463 def _choose_group_to_run(self, include_queue_entry):
2464 chosen_entries = [include_queue_entry]
2465
2466 num_entries_needed = self.synch_count - 1
2467 if num_entries_needed > 0:
2468 pending_entries = HostQueueEntry.fetch(
2469 where='job_id = %s AND status = "Pending" AND id != %s',
2470 params=(self.id, include_queue_entry.id))
2471 chosen_entries += list(pending_entries)[:num_entries_needed]
2472
2473 self._assign_new_group(chosen_entries)
2474 return chosen_entries
2475
2476
2477 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002478 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002479 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2480 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002481
showard2bab8f42008-11-12 18:15:22 +00002482 queue_entries = self._choose_group_to_run(queue_entry)
2483 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002484
2485
2486 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002487 for queue_entry in queue_entries:
2488 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002489 params = self._get_autoserv_params(queue_entries)
2490 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2491 cmd=params)
2492 tasks = initial_tasks + [queue_task]
2493 entry_ids = [entry.id for entry in queue_entries]
2494
showard170873e2009-01-07 00:22:26 +00002495 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002496
2497
mbligh36768f02008-02-22 18:28:33 +00002498if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002499 main()