blob: b1a023e45c900395a6639ad3f2320010507261f0 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard89f84db2009-03-12 20:39:13 +00008import datetime, errno, optparse, os, pwd, Queue, random, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000017from autotest_lib.frontend.afe import models
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
mblighf3294cc2009-04-08 21:17:38 +000019from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000020from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000021
mblighb090f142008-02-27 21:33:46 +000022
mbligh36768f02008-02-22 18:28:33 +000023RESULTS_DIR = '.'
24AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000025DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000026
27AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showard35162b02009-03-03 02:17:30 +000040# error message to leave in results dir when an autoserv process disappears
41# mysteriously
42_LOST_PROCESS_ERROR = """\
43Autoserv failed abnormally during execution for this job, probably due to a
44system error on the Autotest server. Full results may not be available. Sorry.
45"""
46
mbligh6f8bab42008-02-29 22:45:14 +000047_db = None
mbligh36768f02008-02-22 18:28:33 +000048_shutdown = False
showard170873e2009-01-07 00:22:26 +000049_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
50_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000051_testing_mode = False
showard542e8402008-09-19 20:16:18 +000052_base_url = None
showardc85c21b2008-11-24 22:17:37 +000053_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000054_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000055
showardb18134f2009-03-20 20:52:18 +000056# load the logging settings
57scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000058if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
59 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000060# Here we export the log name, using the same convention as autoserv's results
61# directory.
mblighc9895aa2009-04-01 18:36:58 +000062if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
63 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
64else:
65 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
66 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
67
showardb18134f2009-03-20 20:52:18 +000068logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
69
mbligh36768f02008-02-22 18:28:33 +000070
71def main():
showard27f33872009-04-07 18:20:53 +000072 try:
73 main_without_exception_handling()
74 except:
75 logging.exception('Exception escaping in monitor_db')
76 raise
77
78
79def main_without_exception_handling():
jadmanski0afbb632008-06-06 21:10:57 +000080 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000081
jadmanski0afbb632008-06-06 21:10:57 +000082 parser = optparse.OptionParser(usage)
83 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
84 action='store_true')
85 parser.add_option('--logfile', help='Set a log file that all stdout ' +
86 'should be redirected to. Stderr will go to this ' +
87 'file + ".err"')
88 parser.add_option('--test', help='Indicate that scheduler is under ' +
89 'test and should use dummy autoserv and no parsing',
90 action='store_true')
91 (options, args) = parser.parse_args()
92 if len(args) != 1:
93 parser.print_usage()
94 return
mbligh36768f02008-02-22 18:28:33 +000095
jadmanski0afbb632008-06-06 21:10:57 +000096 global RESULTS_DIR
97 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +000098
showardcca334f2009-03-12 20:38:34 +000099 # Change the cwd while running to avoid issues incase we were launched from
100 # somewhere odd (such as a random NFS home directory of the person running
101 # sudo to launch us as the appropriate user).
102 os.chdir(RESULTS_DIR)
103
jadmanski0afbb632008-06-06 21:10:57 +0000104 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000105 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
106 "notify_email_statuses",
107 default='')
showardc85c21b2008-11-24 22:17:37 +0000108 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000109 _notify_email_statuses = [status for status in
110 re.split(r'[\s,;:]', notify_statuses_list.lower())
111 if status]
showardc85c21b2008-11-24 22:17:37 +0000112
jadmanski0afbb632008-06-06 21:10:57 +0000113 if options.test:
114 global _autoserv_path
115 _autoserv_path = 'autoserv_dummy'
116 global _testing_mode
117 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000118
mbligh37eceaa2008-12-15 22:56:37 +0000119 # AUTOTEST_WEB.base_url is still a supported config option as some people
120 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000121 global _base_url
showard170873e2009-01-07 00:22:26 +0000122 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
123 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000124 if config_base_url:
125 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000126 else:
mbligh37eceaa2008-12-15 22:56:37 +0000127 # For the common case of everything running on a single server you
128 # can just set the hostname in a single place in the config file.
129 server_name = c.get_config_value('SERVER', 'hostname')
130 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000131 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000132 sys.exit(1)
133 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000134
showardc5afc462009-01-13 00:09:39 +0000135 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000136 server.start()
137
jadmanski0afbb632008-06-06 21:10:57 +0000138 try:
showardc5afc462009-01-13 00:09:39 +0000139 init(options.logfile)
140 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000141 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000142
jadmanski0afbb632008-06-06 21:10:57 +0000143 while not _shutdown:
144 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000145 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000146 except:
showard170873e2009-01-07 00:22:26 +0000147 email_manager.manager.log_stacktrace(
148 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000149
showard170873e2009-01-07 00:22:26 +0000150 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000151 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000152 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000153 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000154
155
156def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000157 global _shutdown
158 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000159 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000160
161
162def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000163 if logfile:
164 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000165 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
166 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000167
mblighfb676032009-04-01 18:25:38 +0000168 utils.write_pid("monitor_db")
169
showardb1e51872008-10-07 11:08:18 +0000170 if _testing_mode:
171 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000172 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000173
jadmanski0afbb632008-06-06 21:10:57 +0000174 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
175 global _db
showard170873e2009-01-07 00:22:26 +0000176 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000177 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000178
showardfa8629c2008-11-04 16:51:23 +0000179 # ensure Django connection is in autocommit
180 setup_django_environment.enable_autocommit()
181
showardb18134f2009-03-20 20:52:18 +0000182 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000183 signal.signal(signal.SIGINT, handle_sigint)
184
showardd1ee1dd2009-01-07 21:33:08 +0000185 drones = global_config.global_config.get_config_value(
186 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
187 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000188 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000189 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000190 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
191
showardb18134f2009-03-20 20:52:18 +0000192 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000193
194
195def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000196 out_file = logfile
197 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000198 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000199 out_fd = open(out_file, "a", buffering=0)
200 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000201
jadmanski0afbb632008-06-06 21:10:57 +0000202 os.dup2(out_fd.fileno(), sys.stdout.fileno())
203 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000204
jadmanski0afbb632008-06-06 21:10:57 +0000205 sys.stdout = out_fd
206 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000207
208
mblighd5c95802008-03-05 00:33:46 +0000209def queue_entries_to_abort():
jadmanski0afbb632008-06-06 21:10:57 +0000210 rows = _db.execute("""
211 SELECT * FROM host_queue_entries WHERE status='Abort';
212 """)
showard2bab8f42008-11-12 18:15:22 +0000213
jadmanski0afbb632008-06-06 21:10:57 +0000214 qe = [HostQueueEntry(row=i) for i in rows]
215 return qe
mbligh36768f02008-02-22 18:28:33 +0000216
showard7cf9a9b2008-05-15 21:15:52 +0000217
showard87ba02a2009-04-20 19:37:32 +0000218def _autoserv_command_line(machines, results_dir, extra_args, job=None,
219 queue_entry=None):
220 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
221 '-r', _drone_manager.absolute_path(results_dir)]
222 if job or queue_entry:
223 if not job:
224 job = queue_entry.job
225 autoserv_argv += ['-u', job.owner, '-l', job.name]
226 return autoserv_argv + extra_args
227
228
showard89f84db2009-03-12 20:39:13 +0000229class SchedulerError(Exception):
230 """Raised by HostScheduler when an inconsistent state occurs."""
231
232
showard63a34772008-08-18 19:32:50 +0000233class HostScheduler(object):
234 def _get_ready_hosts(self):
235 # avoid any host with a currently active queue entry against it
236 hosts = Host.fetch(
237 joins='LEFT JOIN host_queue_entries AS active_hqe '
238 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000239 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000240 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000241 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000242 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
243 return dict((host.id, host) for host in hosts)
244
245
246 @staticmethod
247 def _get_sql_id_list(id_list):
248 return ','.join(str(item_id) for item_id in id_list)
249
250
251 @classmethod
showard989f25d2008-10-01 11:38:11 +0000252 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000253 if not id_list:
254 return {}
showard63a34772008-08-18 19:32:50 +0000255 query %= cls._get_sql_id_list(id_list)
256 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000257 return cls._process_many2many_dict(rows, flip)
258
259
260 @staticmethod
261 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000262 result = {}
263 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000264 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000265 if flip:
266 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000267 result.setdefault(left_id, set()).add(right_id)
268 return result
269
270
271 @classmethod
272 def _get_job_acl_groups(cls, job_ids):
273 query = """
showardd9ac4452009-02-07 02:04:37 +0000274 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000275 FROM jobs
276 INNER JOIN users ON users.login = jobs.owner
277 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
278 WHERE jobs.id IN (%s)
279 """
280 return cls._get_many2many_dict(query, job_ids)
281
282
283 @classmethod
284 def _get_job_ineligible_hosts(cls, job_ids):
285 query = """
286 SELECT job_id, host_id
287 FROM ineligible_host_queues
288 WHERE job_id IN (%s)
289 """
290 return cls._get_many2many_dict(query, job_ids)
291
292
293 @classmethod
showard989f25d2008-10-01 11:38:11 +0000294 def _get_job_dependencies(cls, job_ids):
295 query = """
296 SELECT job_id, label_id
297 FROM jobs_dependency_labels
298 WHERE job_id IN (%s)
299 """
300 return cls._get_many2many_dict(query, job_ids)
301
302
303 @classmethod
showard63a34772008-08-18 19:32:50 +0000304 def _get_host_acls(cls, host_ids):
305 query = """
showardd9ac4452009-02-07 02:04:37 +0000306 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000307 FROM acl_groups_hosts
308 WHERE host_id IN (%s)
309 """
310 return cls._get_many2many_dict(query, host_ids)
311
312
313 @classmethod
314 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000315 if not host_ids:
316 return {}, {}
showard63a34772008-08-18 19:32:50 +0000317 query = """
318 SELECT label_id, host_id
319 FROM hosts_labels
320 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000321 """ % cls._get_sql_id_list(host_ids)
322 rows = _db.execute(query)
323 labels_to_hosts = cls._process_many2many_dict(rows)
324 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
325 return labels_to_hosts, hosts_to_labels
326
327
328 @classmethod
329 def _get_labels(cls):
330 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000331
332
333 def refresh(self, pending_queue_entries):
334 self._hosts_available = self._get_ready_hosts()
335
336 relevant_jobs = [queue_entry.job_id
337 for queue_entry in pending_queue_entries]
338 self._job_acls = self._get_job_acl_groups(relevant_jobs)
339 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000340 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000341
342 host_ids = self._hosts_available.keys()
343 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000344 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
345
346 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000347
348
349 def _is_acl_accessible(self, host_id, queue_entry):
350 job_acls = self._job_acls.get(queue_entry.job_id, set())
351 host_acls = self._host_acls.get(host_id, set())
352 return len(host_acls.intersection(job_acls)) > 0
353
354
showard989f25d2008-10-01 11:38:11 +0000355 def _check_job_dependencies(self, job_dependencies, host_labels):
356 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000357 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000358
359
360 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
361 queue_entry):
showardade14e22009-01-26 22:38:32 +0000362 if not queue_entry.meta_host:
363 # bypass only_if_needed labels when a specific host is selected
364 return True
365
showard989f25d2008-10-01 11:38:11 +0000366 for label_id in host_labels:
367 label = self._labels[label_id]
368 if not label.only_if_needed:
369 # we don't care about non-only_if_needed labels
370 continue
371 if queue_entry.meta_host == label_id:
372 # if the label was requested in a metahost it's OK
373 continue
374 if label_id not in job_dependencies:
375 return False
376 return True
377
378
showard89f84db2009-03-12 20:39:13 +0000379 def _check_atomic_group_labels(self, host_labels, queue_entry):
380 """
381 Determine if the given HostQueueEntry's atomic group settings are okay
382 to schedule on a host with the given labels.
383
384 @param host_labels - A list of label ids that the host has.
385 @param queue_entry - The HostQueueEntry being considered for the host.
386
387 @returns True if atomic group settings are okay, False otherwise.
388 """
389 return (self._get_host_atomic_group_id(host_labels) ==
390 queue_entry.atomic_group_id)
391
392
393 def _get_host_atomic_group_id(self, host_labels):
394 """
395 Return the atomic group label id for a host with the given set of
396 labels if any, or None otherwise. Raises an exception if more than
397 one atomic group are found in the set of labels.
398
399 @param host_labels - A list of label ids that the host has.
400
401 @returns The id of the atomic group found on a label in host_labels
402 or None if no atomic group label is found.
403 @raises SchedulerError - If more than one atomic group label is found.
404 """
405 atomic_ids = [self._labels[label_id].atomic_group_id
406 for label_id in host_labels
407 if self._labels[label_id].atomic_group_id is not None]
408 if not atomic_ids:
409 return None
410 if len(atomic_ids) > 1:
411 raise SchedulerError('More than one atomic label on host.')
412 return atomic_ids[0]
413
414
415 def _get_atomic_group_labels(self, atomic_group_id):
416 """
417 Lookup the label ids that an atomic_group is associated with.
418
419 @param atomic_group_id - The id of the AtomicGroup to look up.
420
421 @returns A generator yeilding Label ids for this atomic group.
422 """
423 return (id for id, label in self._labels.iteritems()
424 if label.atomic_group_id == atomic_group_id
425 and not label.invalid)
426
427
428 def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
429 """
430 @param group_hosts - A sequence of Host ids to test for usability
431 and eligibility against the Job associated with queue_entry.
432 @param queue_entry - The HostQueueEntry that these hosts are being
433 tested for eligibility against.
434
435 @returns A subset of group_hosts Host ids that are eligible for the
436 supplied queue_entry.
437 """
438 return set(host_id for host_id in group_hosts
439 if self._is_host_usable(host_id)
440 and self._is_host_eligible_for_job(host_id, queue_entry))
441
442
showard989f25d2008-10-01 11:38:11 +0000443 def _is_host_eligible_for_job(self, host_id, queue_entry):
444 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
445 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000446
showard89f84db2009-03-12 20:39:13 +0000447 return (self._is_acl_accessible(host_id, queue_entry) and
448 self._check_job_dependencies(job_dependencies, host_labels) and
449 self._check_only_if_needed_labels(
450 job_dependencies, host_labels, queue_entry) and
451 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000452
453
showard63a34772008-08-18 19:32:50 +0000454 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000455 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000456 return None
457 return self._hosts_available.pop(queue_entry.host_id, None)
458
459
460 def _is_host_usable(self, host_id):
461 if host_id not in self._hosts_available:
462 # host was already used during this scheduling cycle
463 return False
464 if self._hosts_available[host_id].invalid:
465 # Invalid hosts cannot be used for metahosts. They're included in
466 # the original query because they can be used by non-metahosts.
467 return False
468 return True
469
470
471 def _schedule_metahost(self, queue_entry):
472 label_id = queue_entry.meta_host
473 hosts_in_label = self._label_hosts.get(label_id, set())
474 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
475 set())
476
477 # must iterate over a copy so we can mutate the original while iterating
478 for host_id in list(hosts_in_label):
479 if not self._is_host_usable(host_id):
480 hosts_in_label.remove(host_id)
481 continue
482 if host_id in ineligible_host_ids:
483 continue
showard989f25d2008-10-01 11:38:11 +0000484 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000485 continue
486
showard89f84db2009-03-12 20:39:13 +0000487 # Remove the host from our cached internal state before returning
488 # the host object.
showard63a34772008-08-18 19:32:50 +0000489 hosts_in_label.remove(host_id)
490 return self._hosts_available.pop(host_id)
491 return None
492
493
494 def find_eligible_host(self, queue_entry):
495 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000496 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000497 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000498 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000499 return self._schedule_metahost(queue_entry)
500
501
showard89f84db2009-03-12 20:39:13 +0000502 def find_eligible_atomic_group(self, queue_entry):
503 """
504 Given an atomic group host queue entry, locate an appropriate group
505 of hosts for the associated job to run on.
506
507 The caller is responsible for creating new HQEs for the additional
508 hosts returned in order to run the actual job on them.
509
510 @returns A list of Host instances in a ready state to satisfy this
511 atomic group scheduling. Hosts will all belong to the same
512 atomic group label as specified by the queue_entry.
513 An empty list will be returned if no suitable atomic
514 group could be found.
515
516 TODO(gps): what is responsible for kicking off any attempted repairs on
517 a group of hosts? not this function, but something needs to. We do
518 not communicate that reason for returning [] outside of here...
519 For now, we'll just be unschedulable if enough hosts within one group
520 enter Repair Failed state.
521 """
522 assert queue_entry.atomic_group_id is not None
523 job = queue_entry.job
524 assert job.synch_count and job.synch_count > 0
525 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
526 if job.synch_count > atomic_group.max_number_of_machines:
527 # Such a Job and HostQueueEntry should never be possible to
528 # create using the frontend. Regardless, we can't process it.
529 # Abort it immediately and log an error on the scheduler.
530 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000531 logging.error(
532 'Error: job %d synch_count=%d > requested atomic_group %d '
533 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
534 job.id, job.synch_count, atomic_group.id,
535 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000536 return []
537 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
538 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
539 set())
540
541 # Look in each label associated with atomic_group until we find one with
542 # enough hosts to satisfy the job.
543 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
544 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
545 if queue_entry.meta_host is not None:
546 # If we have a metahost label, only allow its hosts.
547 group_hosts.intersection_update(hosts_in_label)
548 group_hosts -= ineligible_host_ids
549 eligible_hosts_in_group = self._get_eligible_hosts_in_group(
550 group_hosts, queue_entry)
551
552 # Job.synch_count is treated as "minimum synch count" when
553 # scheduling for an atomic group of hosts. The atomic group
554 # number of machines is the maximum to pick out of a single
555 # atomic group label for scheduling at one time.
556 min_hosts = job.synch_count
557 max_hosts = atomic_group.max_number_of_machines
558
559 if len(eligible_hosts_in_group) < min_hosts:
560 # Not enough eligible hosts in this atomic group label.
561 continue
562
563 # Limit ourselves to scheduling the atomic group size.
564 if len(eligible_hosts_in_group) > max_hosts:
565 eligible_hosts_in_group = random.sample(
566 eligible_hosts_in_group, max_hosts)
567
568 # Remove the selected hosts from our cached internal state
569 # of available hosts in order to return the Host objects.
570 host_list = []
571 for host_id in eligible_hosts_in_group:
572 hosts_in_label.discard(host_id)
573 host_list.append(self._hosts_available.pop(host_id))
574 return host_list
575
576 return []
577
578
showard170873e2009-01-07 00:22:26 +0000579class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000580 def __init__(self):
581 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000582 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000583 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000584 user_cleanup_time = scheduler_config.config.clean_interval
585 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
586 _db, user_cleanup_time)
587 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000588 self._host_agents = {}
589 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000590
mbligh36768f02008-02-22 18:28:33 +0000591
showard915958d2009-04-22 21:00:58 +0000592 def initialize(self, recover_hosts=True):
593 self._periodic_cleanup.initialize()
594 self._24hr_upkeep.initialize()
595
jadmanski0afbb632008-06-06 21:10:57 +0000596 # always recover processes
597 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000598
jadmanski0afbb632008-06-06 21:10:57 +0000599 if recover_hosts:
600 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000601
602
jadmanski0afbb632008-06-06 21:10:57 +0000603 def tick(self):
showard170873e2009-01-07 00:22:26 +0000604 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000605 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000606 self._find_aborting()
607 self._schedule_new_jobs()
608 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000609 _drone_manager.execute_actions()
610 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000611
showard97aed502008-11-04 02:01:24 +0000612
mblighf3294cc2009-04-08 21:17:38 +0000613 def _run_cleanup(self):
614 self._periodic_cleanup.run_cleanup_maybe()
615 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000616
mbligh36768f02008-02-22 18:28:33 +0000617
showard170873e2009-01-07 00:22:26 +0000618 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
619 for object_id in object_ids:
620 agent_dict.setdefault(object_id, set()).add(agent)
621
622
623 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
624 for object_id in object_ids:
625 assert object_id in agent_dict
626 agent_dict[object_id].remove(agent)
627
628
jadmanski0afbb632008-06-06 21:10:57 +0000629 def add_agent(self, agent):
630 self._agents.append(agent)
631 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000632 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
633 self._register_agent_for_ids(self._queue_entry_agents,
634 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000635
showard170873e2009-01-07 00:22:26 +0000636
637 def get_agents_for_entry(self, queue_entry):
638 """
639 Find agents corresponding to the specified queue_entry.
640 """
641 return self._queue_entry_agents.get(queue_entry.id, set())
642
643
644 def host_has_agent(self, host):
645 """
646 Determine if there is currently an Agent present using this host.
647 """
648 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000649
650
jadmanski0afbb632008-06-06 21:10:57 +0000651 def remove_agent(self, agent):
652 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000653 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
654 agent)
655 self._unregister_agent_for_ids(self._queue_entry_agents,
656 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000657
658
showard4c5374f2008-09-04 17:02:56 +0000659 def num_running_processes(self):
660 return sum(agent.num_processes for agent in self._agents
661 if agent.is_running())
mblighbb421852008-03-11 22:36:16 +0000662
663
showard170873e2009-01-07 00:22:26 +0000664 def _extract_execution_tag(self, command_line):
665 match = re.match(r'.* -P (\S+) ', command_line)
666 if not match:
667 return None
668 return match.group(1)
mblighbb421852008-03-11 22:36:16 +0000669
670
showard2bab8f42008-11-12 18:15:22 +0000671 def _recover_queue_entries(self, queue_entries, run_monitor):
672 assert len(queue_entries) > 0
showard2bab8f42008-11-12 18:15:22 +0000673 queue_task = RecoveryQueueTask(job=queue_entries[0].job,
674 queue_entries=queue_entries,
675 run_monitor=run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +0000676 self.add_agent(Agent(tasks=[queue_task],
showard170873e2009-01-07 00:22:26 +0000677 num_processes=len(queue_entries)))
mblighbb421852008-03-11 22:36:16 +0000678
679
jadmanski0afbb632008-06-06 21:10:57 +0000680 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000681 self._register_pidfiles()
682 _drone_manager.refresh()
683 self._recover_running_entries()
684 self._recover_aborting_entries()
685 self._requeue_other_active_entries()
686 self._recover_parsing_entries()
687 self._reverify_remaining_hosts()
688 # reinitialize drones after killing orphaned processes, since they can
689 # leave around files when they die
690 _drone_manager.execute_actions()
691 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000692
showard170873e2009-01-07 00:22:26 +0000693
694 def _register_pidfiles(self):
695 # during recovery we may need to read pidfiles for both running and
696 # parsing entries
697 queue_entries = HostQueueEntry.fetch(
698 where="status IN ('Running', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000699 for queue_entry in queue_entries:
showard170873e2009-01-07 00:22:26 +0000700 pidfile_id = _drone_manager.get_pidfile_id_from(
701 queue_entry.execution_tag())
702 _drone_manager.register_pidfile(pidfile_id)
703
704
705 def _recover_running_entries(self):
706 orphans = _drone_manager.get_orphaned_autoserv_processes()
707
708 queue_entries = HostQueueEntry.fetch(where="status = 'Running'")
709 requeue_entries = []
710 for queue_entry in queue_entries:
711 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000712 # synchronous job we've already recovered
713 continue
showard170873e2009-01-07 00:22:26 +0000714 execution_tag = queue_entry.execution_tag()
715 run_monitor = PidfileRunMonitor()
716 run_monitor.attach_to_existing_process(execution_tag)
717 if not run_monitor.has_process():
718 # autoserv apparently never got run, so let it get requeued
719 continue
showarde788ea62008-11-17 21:02:47 +0000720 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showardb18134f2009-03-20 20:52:18 +0000721 logging.info('Recovering %s (process %s)',
722 (', '.join(str(entry) for entry in queue_entries),
723 run_monitor.get_process()))
showard2bab8f42008-11-12 18:15:22 +0000724 self._recover_queue_entries(queue_entries, run_monitor)
showard170873e2009-01-07 00:22:26 +0000725 orphans.pop(execution_tag, None)
mbligh90a549d2008-03-25 23:52:34 +0000726
jadmanski0afbb632008-06-06 21:10:57 +0000727 # now kill any remaining autoserv processes
showard170873e2009-01-07 00:22:26 +0000728 for process in orphans.itervalues():
showardb18134f2009-03-20 20:52:18 +0000729 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000730 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000731
showard170873e2009-01-07 00:22:26 +0000732
733 def _recover_aborting_entries(self):
734 queue_entries = HostQueueEntry.fetch(
735 where='status IN ("Abort", "Aborting")')
jadmanski0afbb632008-06-06 21:10:57 +0000736 for queue_entry in queue_entries:
showardb18134f2009-03-20 20:52:18 +0000737 logging.info('Recovering aborting QE %s', queue_entry)
showard170873e2009-01-07 00:22:26 +0000738 agent = queue_entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000739
showard97aed502008-11-04 02:01:24 +0000740
showard170873e2009-01-07 00:22:26 +0000741 def _requeue_other_active_entries(self):
742 queue_entries = HostQueueEntry.fetch(
743 where='active AND NOT complete AND status != "Pending"')
744 for queue_entry in queue_entries:
745 if self.get_agents_for_entry(queue_entry):
746 # entry has already been recovered
747 continue
showardb18134f2009-03-20 20:52:18 +0000748 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
749 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000750 if queue_entry.host:
751 tasks = queue_entry.host.reverify_tasks()
752 self.add_agent(Agent(tasks))
753 agent = queue_entry.requeue()
754
755
756 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000757 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000758 self._reverify_hosts_where("""(status = 'Repairing' OR
759 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000760 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000761
showard170873e2009-01-07 00:22:26 +0000762 # recover "Running" hosts with no active queue entries, although this
763 # should never happen
764 message = ('Recovering running host %s - this probably indicates a '
765 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000766 self._reverify_hosts_where("""status = 'Running' AND
767 id NOT IN (SELECT host_id
768 FROM host_queue_entries
769 WHERE active)""",
770 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000771
772
jadmanski0afbb632008-06-06 21:10:57 +0000773 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000774 print_message='Reverifying host %s'):
775 full_where='locked = 0 AND invalid = 0 AND ' + where
776 for host in Host.fetch(where=full_where):
777 if self.host_has_agent(host):
778 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000779 continue
showard170873e2009-01-07 00:22:26 +0000780 if print_message:
showardb18134f2009-03-20 20:52:18 +0000781 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000782 tasks = host.reverify_tasks()
783 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000784
785
showard97aed502008-11-04 02:01:24 +0000786 def _recover_parsing_entries(self):
showard2bab8f42008-11-12 18:15:22 +0000787 recovered_entry_ids = set()
showard97aed502008-11-04 02:01:24 +0000788 for entry in HostQueueEntry.fetch(where='status = "Parsing"'):
showard2bab8f42008-11-12 18:15:22 +0000789 if entry.id in recovered_entry_ids:
790 continue
791 queue_entries = entry.job.get_group_entries(entry)
showard170873e2009-01-07 00:22:26 +0000792 recovered_entry_ids = recovered_entry_ids.union(
793 entry.id for entry in queue_entries)
showardb18134f2009-03-20 20:52:18 +0000794 logging.info('Recovering parsing entries %s',
795 (', '.join(str(entry) for entry in queue_entries)))
showard97aed502008-11-04 02:01:24 +0000796
797 reparse_task = FinalReparseTask(queue_entries)
showard170873e2009-01-07 00:22:26 +0000798 self.add_agent(Agent([reparse_task], num_processes=0))
showard97aed502008-11-04 02:01:24 +0000799
800
jadmanski0afbb632008-06-06 21:10:57 +0000801 def _recover_hosts(self):
802 # recover "Repair Failed" hosts
803 message = 'Reverifying dead host %s'
804 self._reverify_hosts_where("status = 'Repair Failed'",
805 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000806
807
showard04c82c52008-05-29 19:38:12 +0000808
showardb95b1bd2008-08-15 18:11:04 +0000809 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000810 # prioritize by job priority, then non-metahost over metahost, then FIFO
811 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000812 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000813 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000814 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000815
816
showard89f84db2009-03-12 20:39:13 +0000817 def _refresh_pending_queue_entries(self):
818 """
819 Lookup the pending HostQueueEntries and call our HostScheduler
820 refresh() method given that list. Return the list.
821
822 @returns A list of pending HostQueueEntries sorted in priority order.
823 """
showard63a34772008-08-18 19:32:50 +0000824 queue_entries = self._get_pending_queue_entries()
825 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000826 return []
showardb95b1bd2008-08-15 18:11:04 +0000827
showard63a34772008-08-18 19:32:50 +0000828 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000829
showard89f84db2009-03-12 20:39:13 +0000830 return queue_entries
831
832
833 def _schedule_atomic_group(self, queue_entry):
834 """
835 Schedule the given queue_entry on an atomic group of hosts.
836
837 Returns immediately if there are insufficient available hosts.
838
839 Creates new HostQueueEntries based off of queue_entry for the
840 scheduled hosts and starts them all running.
841 """
842 # This is a virtual host queue entry representing an entire
843 # atomic group, find a group and schedule their hosts.
844 group_hosts = self._host_scheduler.find_eligible_atomic_group(
845 queue_entry)
846 if not group_hosts:
847 return
848 # The first assigned host uses the original HostQueueEntry
849 group_queue_entries = [queue_entry]
850 for assigned_host in group_hosts[1:]:
851 # Create a new HQE for every additional assigned_host.
852 new_hqe = HostQueueEntry.clone(queue_entry)
853 new_hqe.save()
854 group_queue_entries.append(new_hqe)
855 assert len(group_queue_entries) == len(group_hosts)
856 for queue_entry, host in itertools.izip(group_queue_entries,
857 group_hosts):
858 self._run_queue_entry(queue_entry, host)
859
860
861 def _schedule_new_jobs(self):
862 queue_entries = self._refresh_pending_queue_entries()
863 if not queue_entries:
864 return
865
showard63a34772008-08-18 19:32:50 +0000866 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000867 if (queue_entry.atomic_group_id is None or
868 queue_entry.host_id is not None):
869 assigned_host = self._host_scheduler.find_eligible_host(
870 queue_entry)
871 if assigned_host:
872 self._run_queue_entry(queue_entry, assigned_host)
873 else:
874 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000875
876
877 def _run_queue_entry(self, queue_entry, host):
878 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000879 # in some cases (synchronous jobs with run_verify=False), agent may be
880 # None
showard9976ce92008-10-15 20:28:13 +0000881 if agent:
882 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000883
884
jadmanski0afbb632008-06-06 21:10:57 +0000885 def _find_aborting(self):
jadmanski0afbb632008-06-06 21:10:57 +0000886 for entry in queue_entries_to_abort():
showard170873e2009-01-07 00:22:26 +0000887 agents_to_abort = list(self.get_agents_for_entry(entry))
showard1be97432008-10-17 15:30:45 +0000888 for agent in agents_to_abort:
889 self.remove_agent(agent)
890
showard170873e2009-01-07 00:22:26 +0000891 entry.abort(self, agents_to_abort)
jadmanski0afbb632008-06-06 21:10:57 +0000892
893
showard324bf812009-01-20 23:23:38 +0000894 def _can_start_agent(self, agent, num_started_this_cycle,
895 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000896 # always allow zero-process agents to run
897 if agent.num_processes == 0:
898 return True
899 # don't allow any nonzero-process agents to run after we've reached a
900 # limit (this avoids starvation of many-process agents)
901 if have_reached_limit:
902 return False
903 # total process throttling
showard324bf812009-01-20 23:23:38 +0000904 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000905 return False
906 # if a single agent exceeds the per-cycle throttling, still allow it to
907 # run when it's the first agent in the cycle
908 if num_started_this_cycle == 0:
909 return True
910 # per-cycle throttling
911 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000912 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000913 return False
914 return True
915
916
jadmanski0afbb632008-06-06 21:10:57 +0000917 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000918 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000919 have_reached_limit = False
920 # iterate over copy, so we can remove agents during iteration
921 for agent in list(self._agents):
922 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000923 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000924 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000925 continue
926 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000927 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000928 have_reached_limit):
929 have_reached_limit = True
930 continue
showard4c5374f2008-09-04 17:02:56 +0000931 num_started_this_cycle += agent.num_processes
932 agent.tick()
showardb18134f2009-03-20 20:52:18 +0000933 logging.info('%d running processes',
934 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000935
936
showard170873e2009-01-07 00:22:26 +0000937class PidfileRunMonitor(object):
938 """
939 Client must call either run() to start a new process or
940 attach_to_existing_process().
941 """
mbligh36768f02008-02-22 18:28:33 +0000942
showard170873e2009-01-07 00:22:26 +0000943 class _PidfileException(Exception):
944 """
945 Raised when there's some unexpected behavior with the pid file, but only
946 used internally (never allowed to escape this class).
947 """
mbligh36768f02008-02-22 18:28:33 +0000948
949
showard170873e2009-01-07 00:22:26 +0000950 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000951 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000952 self._start_time = None
953 self.pidfile_id = None
954 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000955
956
showard170873e2009-01-07 00:22:26 +0000957 def _add_nice_command(self, command, nice_level):
958 if not nice_level:
959 return command
960 return ['nice', '-n', str(nice_level)] + command
961
962
963 def _set_start_time(self):
964 self._start_time = time.time()
965
966
967 def run(self, command, working_directory, nice_level=None, log_file=None,
968 pidfile_name=None, paired_with_pidfile=None):
969 assert command is not None
970 if nice_level is not None:
971 command = ['nice', '-n', str(nice_level)] + command
972 self._set_start_time()
973 self.pidfile_id = _drone_manager.execute_command(
974 command, working_directory, log_file=log_file,
975 pidfile_name=pidfile_name, paired_with_pidfile=paired_with_pidfile)
976
977
978 def attach_to_existing_process(self, execution_tag):
979 self._set_start_time()
980 self.pidfile_id = _drone_manager.get_pidfile_id_from(execution_tag)
981 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +0000982
983
jadmanski0afbb632008-06-06 21:10:57 +0000984 def kill(self):
showard170873e2009-01-07 00:22:26 +0000985 if self.has_process():
986 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000987
mbligh36768f02008-02-22 18:28:33 +0000988
showard170873e2009-01-07 00:22:26 +0000989 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000990 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000991 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000992
993
showard170873e2009-01-07 00:22:26 +0000994 def get_process(self):
showard21baa452008-10-21 00:08:39 +0000995 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +0000996 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +0000997 return self._state.process
mblighbb421852008-03-11 22:36:16 +0000998
999
showard170873e2009-01-07 00:22:26 +00001000 def _read_pidfile(self, use_second_read=False):
1001 assert self.pidfile_id is not None, (
1002 'You must call run() or attach_to_existing_process()')
1003 contents = _drone_manager.get_pidfile_contents(
1004 self.pidfile_id, use_second_read=use_second_read)
1005 if contents.is_invalid():
1006 self._state = drone_manager.PidfileContents()
1007 raise self._PidfileException(contents)
1008 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001009
1010
showard21baa452008-10-21 00:08:39 +00001011 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001012 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1013 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001014 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001015 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001016 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001017
1018
1019 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001020 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001021 return
mblighbb421852008-03-11 22:36:16 +00001022
showard21baa452008-10-21 00:08:39 +00001023 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001024
showard170873e2009-01-07 00:22:26 +00001025 if self._state.process is None:
1026 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001027 return
mbligh90a549d2008-03-25 23:52:34 +00001028
showard21baa452008-10-21 00:08:39 +00001029 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001030 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001031 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001032 return
mbligh90a549d2008-03-25 23:52:34 +00001033
showard170873e2009-01-07 00:22:26 +00001034 # pid but no running process - maybe process *just* exited
1035 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001036 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001037 # autoserv exited without writing an exit code
1038 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001039 self._handle_pidfile_error(
1040 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001041
showard21baa452008-10-21 00:08:39 +00001042
1043 def _get_pidfile_info(self):
1044 """\
1045 After completion, self._state will contain:
1046 pid=None, exit_status=None if autoserv has not yet run
1047 pid!=None, exit_status=None if autoserv is running
1048 pid!=None, exit_status!=None if autoserv has completed
1049 """
1050 try:
1051 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001052 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001053 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001054
1055
showard170873e2009-01-07 00:22:26 +00001056 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001057 """\
1058 Called when no pidfile is found or no pid is in the pidfile.
1059 """
showard170873e2009-01-07 00:22:26 +00001060 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001061 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001062 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1063 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001064 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001065 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001066
1067
showard35162b02009-03-03 02:17:30 +00001068 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001069 """\
1070 Called when autoserv has exited without writing an exit status,
1071 or we've timed out waiting for autoserv to write a pid to the
1072 pidfile. In either case, we just return failure and the caller
1073 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001074
showard170873e2009-01-07 00:22:26 +00001075 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001076 """
1077 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001078 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001079 self._state.exit_status = 1
1080 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001081
1082
jadmanski0afbb632008-06-06 21:10:57 +00001083 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001084 self._get_pidfile_info()
1085 return self._state.exit_status
1086
1087
1088 def num_tests_failed(self):
1089 self._get_pidfile_info()
1090 assert self._state.num_tests_failed is not None
1091 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001092
1093
mbligh36768f02008-02-22 18:28:33 +00001094class Agent(object):
showard170873e2009-01-07 00:22:26 +00001095 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001096 self.active_task = None
1097 self.queue = Queue.Queue(0)
1098 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001099 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001100
showard170873e2009-01-07 00:22:26 +00001101 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1102 for task in tasks)
1103 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1104
jadmanski0afbb632008-06-06 21:10:57 +00001105 for task in tasks:
1106 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001107
1108
showard170873e2009-01-07 00:22:26 +00001109 def _union_ids(self, id_lists):
1110 return set(itertools.chain(*id_lists))
1111
1112
jadmanski0afbb632008-06-06 21:10:57 +00001113 def add_task(self, task):
1114 self.queue.put_nowait(task)
1115 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001116
1117
jadmanski0afbb632008-06-06 21:10:57 +00001118 def tick(self):
showard21baa452008-10-21 00:08:39 +00001119 while not self.is_done():
1120 if self.active_task and not self.active_task.is_done():
1121 self.active_task.poll()
1122 if not self.active_task.is_done():
1123 return
1124 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001125
1126
jadmanski0afbb632008-06-06 21:10:57 +00001127 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001128 logging.info("agent picking task")
jadmanski0afbb632008-06-06 21:10:57 +00001129 if self.active_task:
1130 assert self.active_task.is_done()
mbligh36768f02008-02-22 18:28:33 +00001131
jadmanski0afbb632008-06-06 21:10:57 +00001132 if not self.active_task.success:
1133 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001134
jadmanski0afbb632008-06-06 21:10:57 +00001135 self.active_task = None
1136 if not self.is_done():
1137 self.active_task = self.queue.get_nowait()
1138 if self.active_task:
1139 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001140
1141
jadmanski0afbb632008-06-06 21:10:57 +00001142 def on_task_failure(self):
1143 self.queue = Queue.Queue(0)
showardccbd6c52009-03-21 00:10:21 +00001144 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1145 # get reset.
1146 new_agent = Agent(self.active_task.failure_tasks)
1147 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001148
mblighe2586682008-02-29 22:45:46 +00001149
showard4c5374f2008-09-04 17:02:56 +00001150 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001151 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001152
1153
jadmanski0afbb632008-06-06 21:10:57 +00001154 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001155 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001156
1157
jadmanski0afbb632008-06-06 21:10:57 +00001158 def start(self):
1159 assert self.dispatcher
mbligh36768f02008-02-22 18:28:33 +00001160
jadmanski0afbb632008-06-06 21:10:57 +00001161 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001162
jadmanski0afbb632008-06-06 21:10:57 +00001163
mbligh36768f02008-02-22 18:28:33 +00001164class AgentTask(object):
showard170873e2009-01-07 00:22:26 +00001165 def __init__(self, cmd, working_directory=None, failure_tasks=[]):
jadmanski0afbb632008-06-06 21:10:57 +00001166 self.done = False
1167 self.failure_tasks = failure_tasks
1168 self.started = False
1169 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001170 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001171 self.task = None
1172 self.agent = None
1173 self.monitor = None
1174 self.success = None
showard170873e2009-01-07 00:22:26 +00001175 self.queue_entry_ids = []
1176 self.host_ids = []
1177 self.log_file = None
1178
1179
1180 def _set_ids(self, host=None, queue_entries=None):
1181 if queue_entries and queue_entries != [None]:
1182 self.host_ids = [entry.host.id for entry in queue_entries]
1183 self.queue_entry_ids = [entry.id for entry in queue_entries]
1184 else:
1185 assert host
1186 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001187
1188
jadmanski0afbb632008-06-06 21:10:57 +00001189 def poll(self):
jadmanski0afbb632008-06-06 21:10:57 +00001190 if self.monitor:
1191 self.tick(self.monitor.exit_code())
1192 else:
1193 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001194
1195
jadmanski0afbb632008-06-06 21:10:57 +00001196 def tick(self, exit_code):
showard170873e2009-01-07 00:22:26 +00001197 if exit_code is None:
jadmanski0afbb632008-06-06 21:10:57 +00001198 return
jadmanski0afbb632008-06-06 21:10:57 +00001199 if exit_code == 0:
1200 success = True
1201 else:
1202 success = False
mbligh36768f02008-02-22 18:28:33 +00001203
jadmanski0afbb632008-06-06 21:10:57 +00001204 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001205
1206
jadmanski0afbb632008-06-06 21:10:57 +00001207 def is_done(self):
1208 return self.done
mbligh36768f02008-02-22 18:28:33 +00001209
1210
jadmanski0afbb632008-06-06 21:10:57 +00001211 def finished(self, success):
1212 self.done = True
1213 self.success = success
1214 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001215
1216
jadmanski0afbb632008-06-06 21:10:57 +00001217 def prolog(self):
1218 pass
mblighd64e5702008-04-04 21:39:28 +00001219
1220
jadmanski0afbb632008-06-06 21:10:57 +00001221 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001222 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001223
mbligh36768f02008-02-22 18:28:33 +00001224
jadmanski0afbb632008-06-06 21:10:57 +00001225 def cleanup(self):
showard170873e2009-01-07 00:22:26 +00001226 if self.monitor and self.log_file:
1227 _drone_manager.copy_to_results_repository(
1228 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001229
1230
jadmanski0afbb632008-06-06 21:10:57 +00001231 def epilog(self):
1232 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001233
1234
jadmanski0afbb632008-06-06 21:10:57 +00001235 def start(self):
1236 assert self.agent
1237
1238 if not self.started:
1239 self.prolog()
1240 self.run()
1241
1242 self.started = True
1243
1244
1245 def abort(self):
1246 if self.monitor:
1247 self.monitor.kill()
1248 self.done = True
1249 self.cleanup()
1250
1251
showard170873e2009-01-07 00:22:26 +00001252 def set_host_log_file(self, base_name, host):
1253 filename = '%s.%s' % (time.time(), base_name)
1254 self.log_file = os.path.join('hosts', host.hostname, filename)
1255
1256
showardde634ee2009-01-30 01:44:24 +00001257 def _get_consistent_execution_tag(self, queue_entries):
1258 first_execution_tag = queue_entries[0].execution_tag()
1259 for queue_entry in queue_entries[1:]:
1260 assert queue_entry.execution_tag() == first_execution_tag, (
1261 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1262 queue_entry,
1263 first_execution_tag,
1264 queue_entries[0]))
1265 return first_execution_tag
1266
1267
showard678df4f2009-02-04 21:36:39 +00001268 def _copy_and_parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001269 assert len(queue_entries) > 0
1270 assert self.monitor
1271 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001272 results_path = execution_tag + '/'
1273 _drone_manager.copy_to_results_repository(self.monitor.get_process(),
1274 results_path)
showardde634ee2009-01-30 01:44:24 +00001275
1276 reparse_task = FinalReparseTask(queue_entries)
1277 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1278
1279
jadmanski0afbb632008-06-06 21:10:57 +00001280 def run(self):
1281 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001282 self.monitor = PidfileRunMonitor()
1283 self.monitor.run(self.cmd, self._working_directory,
1284 nice_level=AUTOSERV_NICE_LEVEL,
1285 log_file=self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001286
1287
1288class RepairTask(AgentTask):
showarde788ea62008-11-17 21:02:47 +00001289 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001290 """\
showard170873e2009-01-07 00:22:26 +00001291 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001292 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001293 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001294 # normalize the protection name
1295 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001296
jadmanski0afbb632008-06-06 21:10:57 +00001297 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001298 self.queue_entry_to_fail = queue_entry
1299 # *don't* include the queue entry in IDs -- if the queue entry is
1300 # aborted, we want to leave the repair task running
1301 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001302
1303 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001304 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1305 ['-R', '--host-protection', protection],
1306 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001307 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1308
showard170873e2009-01-07 00:22:26 +00001309 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001310
mbligh36768f02008-02-22 18:28:33 +00001311
jadmanski0afbb632008-06-06 21:10:57 +00001312 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001313 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001314 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001315 if self.queue_entry_to_fail:
1316 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001317
1318
showardde634ee2009-01-30 01:44:24 +00001319 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001320 assert self.queue_entry_to_fail
1321
1322 if self.queue_entry_to_fail.meta_host:
1323 return # don't fail metahost entries, they'll be reassigned
1324
1325 self.queue_entry_to_fail.update_from_database()
1326 if self.queue_entry_to_fail.status != 'Queued':
1327 return # entry has been aborted
1328
1329 self.queue_entry_to_fail.set_execution_subdir()
showard678df4f2009-02-04 21:36:39 +00001330 # copy results logs into the normal place for job results
1331 _drone_manager.copy_results_on_drone(
1332 self.monitor.get_process(),
1333 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001334 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001335
showardccbd6c52009-03-21 00:10:21 +00001336 self._copy_and_parse_results([self.queue_entry_to_fail])
1337 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001338
1339
jadmanski0afbb632008-06-06 21:10:57 +00001340 def epilog(self):
1341 super(RepairTask, self).epilog()
1342 if self.success:
1343 self.host.set_status('Ready')
1344 else:
1345 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001346 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001347 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001348
1349
showard8fe93b52008-11-18 17:53:22 +00001350class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001351 def epilog(self):
1352 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001353 should_copy_results = (self.queue_entry and not self.success
1354 and not self.queue_entry.meta_host)
1355 if should_copy_results:
1356 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001357 destination = os.path.join(self.queue_entry.execution_tag(),
1358 os.path.basename(self.log_file))
1359 _drone_manager.copy_to_results_repository(
1360 self.monitor.get_process(), self.log_file,
1361 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001362
1363
1364class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001365 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001366 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001367 self.host = host or queue_entry.host
1368 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001369
jadmanski0afbb632008-06-06 21:10:57 +00001370 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001371 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1372 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001373 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001374 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1375 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001376
showard170873e2009-01-07 00:22:26 +00001377 self.set_host_log_file('verify', self.host)
1378 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001379
1380
jadmanski0afbb632008-06-06 21:10:57 +00001381 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001382 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001383 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001384 if self.queue_entry:
1385 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001386 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001387
1388
jadmanski0afbb632008-06-06 21:10:57 +00001389 def epilog(self):
1390 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001391
jadmanski0afbb632008-06-06 21:10:57 +00001392 if self.success:
1393 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001394
1395
mbligh36768f02008-02-22 18:28:33 +00001396class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001397 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001398 self.job = job
1399 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001400 super(QueueTask, self).__init__(cmd, self._execution_tag())
1401 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001402
1403
showard170873e2009-01-07 00:22:26 +00001404 def _format_keyval(self, key, value):
1405 return '%s=%s' % (key, value)
mbligh36768f02008-02-22 18:28:33 +00001406
1407
showard73ec0442009-02-07 02:05:20 +00001408 def _keyval_path(self):
1409 return os.path.join(self._execution_tag(), 'keyval')
1410
1411
1412 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1413 keyval_contents = '\n'.join(self._format_keyval(key, value)
1414 for key, value in keyval_dict.iteritems())
1415 # always end with a newline to allow additional keyvals to be written
1416 keyval_contents += '\n'
1417 _drone_manager.attach_file_to_execution(self._execution_tag(),
1418 keyval_contents,
1419 file_path=keyval_path)
1420
1421
1422 def _write_keyvals_before_job(self, keyval_dict):
1423 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1424
1425
1426 def _write_keyval_after_job(self, field, value):
showard170873e2009-01-07 00:22:26 +00001427 assert self.monitor and self.monitor.has_process()
showard170873e2009-01-07 00:22:26 +00001428 _drone_manager.write_lines_to_file(
showard73ec0442009-02-07 02:05:20 +00001429 self._keyval_path(), [self._format_keyval(field, value)],
showard35162b02009-03-03 02:17:30 +00001430 paired_with_process=self.monitor.get_process())
showardd8e548a2008-09-09 03:04:57 +00001431
1432
showard170873e2009-01-07 00:22:26 +00001433 def _write_host_keyvals(self, host):
1434 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1435 host.hostname)
1436 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001437 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1438 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001439
1440
showard170873e2009-01-07 00:22:26 +00001441 def _execution_tag(self):
1442 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001443
1444
jadmanski0afbb632008-06-06 21:10:57 +00001445 def prolog(self):
showard73ec0442009-02-07 02:05:20 +00001446 queued = int(time.mktime(self.job.created_on.timetuple()))
1447 self._write_keyvals_before_job({'job_queued': queued})
jadmanski0afbb632008-06-06 21:10:57 +00001448 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001449 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001450 queue_entry.set_status('Running')
1451 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001452 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001453 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001454 assert len(self.queue_entries) == 1
1455 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001456
1457
showard35162b02009-03-03 02:17:30 +00001458 def _write_lost_process_error_file(self):
1459 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1460 _drone_manager.write_lines_to_file(error_file_path,
1461 [_LOST_PROCESS_ERROR])
1462
1463
showard97aed502008-11-04 02:01:24 +00001464 def _finish_task(self, success):
showard35162b02009-03-03 02:17:30 +00001465 if self.monitor.has_process():
1466 self._write_keyval_after_job("job_finished", int(time.time()))
1467 self._copy_and_parse_results(self.queue_entries)
1468
1469 if self.monitor.lost_process:
1470 self._write_lost_process_error_file()
1471 for queue_entry in self.queue_entries:
1472 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001473
1474
showardcbd74612008-11-19 21:42:02 +00001475 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001476 _drone_manager.write_lines_to_file(
1477 os.path.join(self._execution_tag(), 'status.log'),
1478 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001479 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001480
1481
jadmanskif7fa2cc2008-10-01 14:13:23 +00001482 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001483 if not self.monitor or not self.monitor.has_process():
1484 return
1485
jadmanskif7fa2cc2008-10-01 14:13:23 +00001486 # build up sets of all the aborted_by and aborted_on values
1487 aborted_by, aborted_on = set(), set()
1488 for queue_entry in self.queue_entries:
1489 if queue_entry.aborted_by:
1490 aborted_by.add(queue_entry.aborted_by)
1491 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1492 aborted_on.add(t)
1493
1494 # extract some actual, unique aborted by value and write it out
1495 assert len(aborted_by) <= 1
1496 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001497 aborted_by_value = aborted_by.pop()
1498 aborted_on_value = max(aborted_on)
1499 else:
1500 aborted_by_value = 'autotest_system'
1501 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001502
showarda0382352009-02-11 23:36:43 +00001503 self._write_keyval_after_job("aborted_by", aborted_by_value)
1504 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001505
showardcbd74612008-11-19 21:42:02 +00001506 aborted_on_string = str(datetime.datetime.fromtimestamp(
1507 aborted_on_value))
1508 self._write_status_comment('Job aborted by %s on %s' %
1509 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001510
1511
jadmanski0afbb632008-06-06 21:10:57 +00001512 def abort(self):
1513 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001514 self._log_abort()
showard97aed502008-11-04 02:01:24 +00001515 self._finish_task(False)
jadmanskic2ac77f2008-05-16 21:44:04 +00001516
1517
showard21baa452008-10-21 00:08:39 +00001518 def _reboot_hosts(self):
1519 reboot_after = self.job.reboot_after
1520 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00001521 if reboot_after == models.RebootAfter.ALWAYS:
showard21baa452008-10-21 00:08:39 +00001522 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00001523 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
showard21baa452008-10-21 00:08:39 +00001524 num_tests_failed = self.monitor.num_tests_failed()
1525 do_reboot = (self.success and num_tests_failed == 0)
1526
showard8ebca792008-11-04 21:54:22 +00001527 for queue_entry in self.queue_entries:
1528 if do_reboot:
showard45ae8192008-11-05 19:32:53 +00001529 # don't pass the queue entry to the CleanupTask. if the cleanup
showardfa8629c2008-11-04 16:51:23 +00001530 # fails, the job doesn't care -- it's over.
showard45ae8192008-11-05 19:32:53 +00001531 cleanup_task = CleanupTask(host=queue_entry.get_host())
1532 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
showard8ebca792008-11-04 21:54:22 +00001533 else:
1534 queue_entry.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001535
1536
jadmanski0afbb632008-06-06 21:10:57 +00001537 def epilog(self):
1538 super(QueueTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001539 self._finish_task(self.success)
showard21baa452008-10-21 00:08:39 +00001540 self._reboot_hosts()
mblighbb421852008-03-11 22:36:16 +00001541
showardb18134f2009-03-20 20:52:18 +00001542 logging.info("queue_task finished with succes=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001543
1544
mblighbb421852008-03-11 22:36:16 +00001545class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001546 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001547 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001548 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001549
1550
jadmanski0afbb632008-06-06 21:10:57 +00001551 def run(self):
1552 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001553
1554
jadmanski0afbb632008-06-06 21:10:57 +00001555 def prolog(self):
1556 # recovering an existing process - don't do prolog
1557 pass
mblighbb421852008-03-11 22:36:16 +00001558
1559
showard8fe93b52008-11-18 17:53:22 +00001560class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001561 def __init__(self, host=None, queue_entry=None):
1562 assert bool(host) ^ bool(queue_entry)
1563 if queue_entry:
1564 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001565 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001566 self.host = host
showard170873e2009-01-07 00:22:26 +00001567
1568 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001569 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1570 ['--cleanup'],
1571 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001572 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001573 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1574 failure_tasks=[repair_task])
1575
1576 self._set_ids(host=host, queue_entries=[queue_entry])
1577 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001578
mblighd5c95802008-03-05 00:33:46 +00001579
jadmanski0afbb632008-06-06 21:10:57 +00001580 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001581 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001582 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001583 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001584
mblighd5c95802008-03-05 00:33:46 +00001585
showard21baa452008-10-21 00:08:39 +00001586 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001587 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001588 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001589 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001590 self.host.update_field('dirty', 0)
1591
1592
mblighd5c95802008-03-05 00:33:46 +00001593class AbortTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001594 def __init__(self, queue_entry, agents_to_abort):
jadmanski0afbb632008-06-06 21:10:57 +00001595 super(AbortTask, self).__init__('')
showard170873e2009-01-07 00:22:26 +00001596 self.queue_entry = queue_entry
1597 # don't use _set_ids, since we don't want to set the host_ids
1598 self.queue_entry_ids = [queue_entry.id]
1599 self.agents_to_abort = agents_to_abort
mbligh36768f02008-02-22 18:28:33 +00001600
1601
jadmanski0afbb632008-06-06 21:10:57 +00001602 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001603 logging.info("starting abort on host %s, job %s",
1604 self.queue_entry.host_id, self.queue_entry.job_id)
mbligh36768f02008-02-22 18:28:33 +00001605
mblighd64e5702008-04-04 21:39:28 +00001606
jadmanski0afbb632008-06-06 21:10:57 +00001607 def epilog(self):
1608 super(AbortTask, self).epilog()
1609 self.queue_entry.set_status('Aborted')
1610 self.success = True
1611
1612
1613 def run(self):
1614 for agent in self.agents_to_abort:
1615 if (agent.active_task):
1616 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001617
1618
showard97aed502008-11-04 02:01:24 +00001619class FinalReparseTask(AgentTask):
showard97aed502008-11-04 02:01:24 +00001620 _num_running_parses = 0
1621
1622 def __init__(self, queue_entries):
1623 self._queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001624 # don't use _set_ids, since we don't want to set the host_ids
1625 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001626 self._parse_started = False
1627
1628 assert len(queue_entries) > 0
1629 queue_entry = queue_entries[0]
showard97aed502008-11-04 02:01:24 +00001630
showard170873e2009-01-07 00:22:26 +00001631 self._execution_tag = queue_entry.execution_tag()
1632 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1633 self._autoserv_monitor = PidfileRunMonitor()
1634 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1635 self._final_status = self._determine_final_status()
1636
showard97aed502008-11-04 02:01:24 +00001637 if _testing_mode:
1638 self.cmd = 'true'
showard170873e2009-01-07 00:22:26 +00001639 else:
1640 super(FinalReparseTask, self).__init__(
1641 cmd=self._generate_parse_command(),
1642 working_directory=self._execution_tag)
showard97aed502008-11-04 02:01:24 +00001643
showard170873e2009-01-07 00:22:26 +00001644 self.log_file = os.path.join(self._execution_tag, '.parse.log')
showard97aed502008-11-04 02:01:24 +00001645
1646
1647 @classmethod
1648 def _increment_running_parses(cls):
1649 cls._num_running_parses += 1
1650
1651
1652 @classmethod
1653 def _decrement_running_parses(cls):
1654 cls._num_running_parses -= 1
1655
1656
1657 @classmethod
1658 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001659 return (cls._num_running_parses <
1660 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001661
1662
showard170873e2009-01-07 00:22:26 +00001663 def _determine_final_status(self):
1664 # we'll use a PidfileRunMonitor to read the autoserv exit status
1665 if self._autoserv_monitor.exit_code() == 0:
1666 return models.HostQueueEntry.Status.COMPLETED
1667 return models.HostQueueEntry.Status.FAILED
1668
1669
showard97aed502008-11-04 02:01:24 +00001670 def prolog(self):
1671 super(FinalReparseTask, self).prolog()
1672 for queue_entry in self._queue_entries:
1673 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
1674
1675
1676 def epilog(self):
1677 super(FinalReparseTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001678 for queue_entry in self._queue_entries:
showard170873e2009-01-07 00:22:26 +00001679 queue_entry.set_status(self._final_status)
showard97aed502008-11-04 02:01:24 +00001680
1681
showard2bab8f42008-11-12 18:15:22 +00001682 def _generate_parse_command(self):
showard170873e2009-01-07 00:22:26 +00001683 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
1684 self._results_dir]
showard97aed502008-11-04 02:01:24 +00001685
1686
1687 def poll(self):
1688 # override poll to keep trying to start until the parse count goes down
1689 # and we can, at which point we revert to default behavior
1690 if self._parse_started:
1691 super(FinalReparseTask, self).poll()
1692 else:
1693 self._try_starting_parse()
1694
1695
1696 def run(self):
1697 # override run() to not actually run unless we can
1698 self._try_starting_parse()
1699
1700
1701 def _try_starting_parse(self):
1702 if not self._can_run_new_parse():
1703 return
showard170873e2009-01-07 00:22:26 +00001704
showard678df4f2009-02-04 21:36:39 +00001705 # make sure we actually have results to parse
showard35162b02009-03-03 02:17:30 +00001706 # this should never happen in normal operation
showard678df4f2009-02-04 21:36:39 +00001707 if not self._autoserv_monitor.has_process():
1708 email_manager.manager.enqueue_notify_email(
1709 'No results to parse',
1710 'No results to parse at %s' % self._autoserv_monitor.pidfile_id)
1711 self.finished(False)
1712 return
1713
showard97aed502008-11-04 02:01:24 +00001714 # actually run the parse command
showard170873e2009-01-07 00:22:26 +00001715 self.monitor = PidfileRunMonitor()
1716 self.monitor.run(self.cmd, self._working_directory,
1717 log_file=self.log_file,
1718 pidfile_name='.parser_execute',
1719 paired_with_pidfile=self._autoserv_monitor.pidfile_id)
1720
showard97aed502008-11-04 02:01:24 +00001721 self._increment_running_parses()
1722 self._parse_started = True
1723
1724
1725 def finished(self, success):
1726 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001727 if self._parse_started:
1728 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001729
1730
showardc9ae1782009-01-30 01:42:37 +00001731class SetEntryPendingTask(AgentTask):
1732 def __init__(self, queue_entry):
1733 super(SetEntryPendingTask, self).__init__(cmd='')
1734 self._queue_entry = queue_entry
1735 self._set_ids(queue_entries=[queue_entry])
1736
1737
1738 def run(self):
1739 agent = self._queue_entry.on_pending()
1740 if agent:
1741 self.agent.dispatcher.add_agent(agent)
1742 self.finished(True)
1743
1744
showarda3c58572009-03-12 20:36:59 +00001745class DBError(Exception):
1746 """Raised by the DBObject constructor when its select fails."""
1747
1748
mbligh36768f02008-02-22 18:28:33 +00001749class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001750 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001751
1752 # Subclasses MUST override these:
1753 _table_name = ''
1754 _fields = ()
1755
showarda3c58572009-03-12 20:36:59 +00001756 # A mapping from (type, id) to the instance of the object for that
1757 # particular id. This prevents us from creating new Job() and Host()
1758 # instances for every HostQueueEntry object that we instantiate as
1759 # multiple HQEs often share the same Job.
1760 _instances_by_type_and_id = weakref.WeakValueDictionary()
1761 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001762
showarda3c58572009-03-12 20:36:59 +00001763
1764 def __new__(cls, id=None, **kwargs):
1765 """
1766 Look to see if we already have an instance for this particular type
1767 and id. If so, use it instead of creating a duplicate instance.
1768 """
1769 if id is not None:
1770 instance = cls._instances_by_type_and_id.get((cls, id))
1771 if instance:
1772 return instance
1773 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1774
1775
1776 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001777 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001778 assert self._table_name, '_table_name must be defined in your class'
1779 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00001780 if not new_record:
1781 if self._initialized and not always_query:
1782 return # We've already been initialized.
1783 if id is None:
1784 id = row[0]
1785 # Tell future constructors to use us instead of re-querying while
1786 # this instance is still around.
1787 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00001788
showard6ae5ea92009-02-25 00:11:51 +00001789 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00001790
jadmanski0afbb632008-06-06 21:10:57 +00001791 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001792
jadmanski0afbb632008-06-06 21:10:57 +00001793 if row is None:
showardccbd6c52009-03-21 00:10:21 +00001794 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00001795
showarda3c58572009-03-12 20:36:59 +00001796 if self._initialized:
1797 differences = self._compare_fields_in_row(row)
1798 if differences:
showard7629f142009-03-27 21:02:02 +00001799 logging.warn(
1800 'initialized %s %s instance requery is updating: %s',
1801 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00001802 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00001803 self._initialized = True
1804
1805
1806 @classmethod
1807 def _clear_instance_cache(cls):
1808 """Used for testing, clear the internal instance cache."""
1809 cls._instances_by_type_and_id.clear()
1810
1811
showardccbd6c52009-03-21 00:10:21 +00001812 def _fetch_row_from_db(self, row_id):
1813 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1814 rows = _db.execute(sql, (row_id,))
1815 if not rows:
showard76e29d12009-04-15 21:53:10 +00001816 raise DBError("row not found (table=%s, row id=%s)"
1817 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00001818 return rows[0]
1819
1820
showarda3c58572009-03-12 20:36:59 +00001821 def _assert_row_length(self, row):
1822 assert len(row) == len(self._fields), (
1823 "table = %s, row = %s/%d, fields = %s/%d" % (
1824 self.__table, row, len(row), self._fields, len(self._fields)))
1825
1826
1827 def _compare_fields_in_row(self, row):
1828 """
1829 Given a row as returned by a SELECT query, compare it to our existing
1830 in memory fields.
1831
1832 @param row - A sequence of values corresponding to fields named in
1833 The class attribute _fields.
1834
1835 @returns A dictionary listing the differences keyed by field name
1836 containing tuples of (current_value, row_value).
1837 """
1838 self._assert_row_length(row)
1839 differences = {}
1840 for field, row_value in itertools.izip(self._fields, row):
1841 current_value = getattr(self, field)
1842 if current_value != row_value:
1843 differences[field] = (current_value, row_value)
1844 return differences
showard2bab8f42008-11-12 18:15:22 +00001845
1846
1847 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00001848 """
1849 Update our field attributes using a single row returned by SELECT.
1850
1851 @param row - A sequence of values corresponding to fields named in
1852 the class fields list.
1853 """
1854 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00001855
showard2bab8f42008-11-12 18:15:22 +00001856 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00001857 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00001858 setattr(self, field, value)
1859 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00001860
showard2bab8f42008-11-12 18:15:22 +00001861 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00001862
mblighe2586682008-02-29 22:45:46 +00001863
showardccbd6c52009-03-21 00:10:21 +00001864 def update_from_database(self):
1865 assert self.id is not None
1866 row = self._fetch_row_from_db(self.id)
1867 self._update_fields_from_row(row)
1868
1869
jadmanski0afbb632008-06-06 21:10:57 +00001870 def count(self, where, table = None):
1871 if not table:
1872 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001873
jadmanski0afbb632008-06-06 21:10:57 +00001874 rows = _db.execute("""
1875 SELECT count(*) FROM %s
1876 WHERE %s
1877 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001878
jadmanski0afbb632008-06-06 21:10:57 +00001879 assert len(rows) == 1
1880
1881 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001882
1883
mblighf8c624d2008-07-03 16:58:45 +00001884 def update_field(self, field, value, condition=''):
showard2bab8f42008-11-12 18:15:22 +00001885 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00001886
showard2bab8f42008-11-12 18:15:22 +00001887 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00001888 return
mbligh36768f02008-02-22 18:28:33 +00001889
mblighf8c624d2008-07-03 16:58:45 +00001890 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
1891 if condition:
1892 query += ' AND (%s)' % condition
jadmanski0afbb632008-06-06 21:10:57 +00001893 _db.execute(query, (value, self.id))
1894
showard2bab8f42008-11-12 18:15:22 +00001895 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00001896
1897
jadmanski0afbb632008-06-06 21:10:57 +00001898 def save(self):
1899 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00001900 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00001901 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00001902 values = []
1903 for key in keys:
1904 value = getattr(self, key)
1905 if value is None:
1906 values.append('NULL')
1907 else:
1908 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00001909 values_str = ','.join(values)
1910 query = ('INSERT INTO %s (%s) VALUES (%s)' %
1911 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00001912 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00001913 # Update our id to the one the database just assigned to us.
1914 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00001915
1916
jadmanski0afbb632008-06-06 21:10:57 +00001917 def delete(self):
showarda3c58572009-03-12 20:36:59 +00001918 self._instances_by_type_and_id.pop((type(self), id), None)
1919 self._initialized = False
1920 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00001921 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1922 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00001923
1924
showard63a34772008-08-18 19:32:50 +00001925 @staticmethod
1926 def _prefix_with(string, prefix):
1927 if string:
1928 string = prefix + string
1929 return string
1930
1931
jadmanski0afbb632008-06-06 21:10:57 +00001932 @classmethod
showard989f25d2008-10-01 11:38:11 +00001933 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00001934 """
1935 Construct instances of our class based on the given database query.
1936
1937 @yields One class instance for each row fetched.
1938 """
showard63a34772008-08-18 19:32:50 +00001939 order_by = cls._prefix_with(order_by, 'ORDER BY ')
1940 where = cls._prefix_with(where, 'WHERE ')
1941 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00001942 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00001943 'joins' : joins,
1944 'where' : where,
1945 'order_by' : order_by})
1946 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00001947 for row in rows:
1948 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00001949
mbligh36768f02008-02-22 18:28:33 +00001950
1951class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001952 _table_name = 'ineligible_host_queues'
1953 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00001954
1955
showard89f84db2009-03-12 20:39:13 +00001956class AtomicGroup(DBObject):
1957 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00001958 _fields = ('id', 'name', 'description', 'max_number_of_machines',
1959 'invalid')
showard89f84db2009-03-12 20:39:13 +00001960
1961
showard989f25d2008-10-01 11:38:11 +00001962class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001963 _table_name = 'labels'
1964 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00001965 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00001966
1967
mbligh36768f02008-02-22 18:28:33 +00001968class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001969 _table_name = 'hosts'
1970 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
1971 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
1972
1973
jadmanski0afbb632008-06-06 21:10:57 +00001974 def current_task(self):
1975 rows = _db.execute("""
1976 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1977 """, (self.id,))
1978
1979 if len(rows) == 0:
1980 return None
1981 else:
1982 assert len(rows) == 1
1983 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00001984 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00001985
1986
jadmanski0afbb632008-06-06 21:10:57 +00001987 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00001988 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001989 if self.current_task():
1990 self.current_task().requeue()
1991
showard6ae5ea92009-02-25 00:11:51 +00001992
jadmanski0afbb632008-06-06 21:10:57 +00001993 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00001994 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00001995 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00001996
1997
showard170873e2009-01-07 00:22:26 +00001998 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00001999 """
showard170873e2009-01-07 00:22:26 +00002000 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002001 """
2002 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002003 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002004 FROM labels
2005 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002006 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002007 ORDER BY labels.name
2008 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002009 platform = None
2010 all_labels = []
2011 for label_name, is_platform in rows:
2012 if is_platform:
2013 platform = label_name
2014 all_labels.append(label_name)
2015 return platform, all_labels
2016
2017
2018 def reverify_tasks(self):
2019 cleanup_task = CleanupTask(host=self)
2020 verify_task = VerifyTask(host=self)
2021 # just to make sure this host does not get taken away
2022 self.set_status('Cleaning')
2023 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002024
2025
mbligh36768f02008-02-22 18:28:33 +00002026class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002027 _table_name = 'host_queue_entries'
2028 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002029 'active', 'complete', 'deleted', 'execution_subdir',
2030 'atomic_group_id')
showard6ae5ea92009-02-25 00:11:51 +00002031
2032
showarda3c58572009-03-12 20:36:59 +00002033 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002034 assert id or row
showarda3c58572009-03-12 20:36:59 +00002035 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002036 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002037
jadmanski0afbb632008-06-06 21:10:57 +00002038 if self.host_id:
2039 self.host = Host(self.host_id)
2040 else:
2041 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002042
showard170873e2009-01-07 00:22:26 +00002043 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002044 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002045
2046
showard89f84db2009-03-12 20:39:13 +00002047 @classmethod
2048 def clone(cls, template):
2049 """
2050 Creates a new row using the values from a template instance.
2051
2052 The new instance will not exist in the database or have a valid
2053 id attribute until its save() method is called.
2054 """
2055 assert isinstance(template, cls)
2056 new_row = [getattr(template, field) for field in cls._fields]
2057 clone = cls(row=new_row, new_record=True)
2058 clone.id = None
2059 return clone
2060
2061
showardc85c21b2008-11-24 22:17:37 +00002062 def _view_job_url(self):
2063 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2064
2065
jadmanski0afbb632008-06-06 21:10:57 +00002066 def set_host(self, host):
2067 if host:
2068 self.queue_log_record('Assigning host ' + host.hostname)
2069 self.update_field('host_id', host.id)
2070 self.update_field('active', True)
2071 self.block_host(host.id)
2072 else:
2073 self.queue_log_record('Releasing host')
2074 self.unblock_host(self.host.id)
2075 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002076
jadmanski0afbb632008-06-06 21:10:57 +00002077 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002078
2079
jadmanski0afbb632008-06-06 21:10:57 +00002080 def get_host(self):
2081 return self.host
mbligh36768f02008-02-22 18:28:33 +00002082
2083
jadmanski0afbb632008-06-06 21:10:57 +00002084 def queue_log_record(self, log_line):
2085 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002086 _drone_manager.write_lines_to_file(self.queue_log_path,
2087 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002088
2089
jadmanski0afbb632008-06-06 21:10:57 +00002090 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002091 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002092 row = [0, self.job.id, host_id]
2093 block = IneligibleHostQueue(row=row, new_record=True)
2094 block.save()
mblighe2586682008-02-29 22:45:46 +00002095
2096
jadmanski0afbb632008-06-06 21:10:57 +00002097 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002098 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002099 blocks = IneligibleHostQueue.fetch(
2100 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2101 for block in blocks:
2102 block.delete()
mblighe2586682008-02-29 22:45:46 +00002103
2104
showard2bab8f42008-11-12 18:15:22 +00002105 def set_execution_subdir(self, subdir=None):
2106 if subdir is None:
2107 assert self.get_host()
2108 subdir = self.get_host().hostname
2109 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002110
2111
showard6355f6b2008-12-05 18:52:13 +00002112 def _get_hostname(self):
2113 if self.host:
2114 return self.host.hostname
2115 return 'no host'
2116
2117
showard170873e2009-01-07 00:22:26 +00002118 def __str__(self):
2119 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2120
2121
jadmanski0afbb632008-06-06 21:10:57 +00002122 def set_status(self, status):
mblighf8c624d2008-07-03 16:58:45 +00002123 abort_statuses = ['Abort', 'Aborting', 'Aborted']
2124 if status not in abort_statuses:
2125 condition = ' AND '.join(['status <> "%s"' % x
2126 for x in abort_statuses])
2127 else:
2128 condition = ''
2129 self.update_field('status', status, condition=condition)
2130
showardb18134f2009-03-20 20:52:18 +00002131 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002132
showardc85c21b2008-11-24 22:17:37 +00002133 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002134 self.update_field('complete', False)
2135 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002136
jadmanski0afbb632008-06-06 21:10:57 +00002137 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showarde58e3f82008-11-20 19:04:59 +00002138 'Aborting']:
jadmanski0afbb632008-06-06 21:10:57 +00002139 self.update_field('complete', False)
2140 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002141
showardc85c21b2008-11-24 22:17:37 +00002142 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002143 self.update_field('complete', True)
2144 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002145
2146 should_email_status = (status.lower() in _notify_email_statuses or
2147 'all' in _notify_email_statuses)
2148 if should_email_status:
2149 self._email_on_status(status)
2150
2151 self._email_on_job_complete()
2152
2153
2154 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002155 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002156
2157 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2158 self.job.id, self.job.name, hostname, status)
2159 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2160 self.job.id, self.job.name, hostname, status,
2161 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002162 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002163
2164
2165 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002166 if not self.job.is_finished():
2167 return
showard542e8402008-09-19 20:16:18 +00002168
showardc85c21b2008-11-24 22:17:37 +00002169 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002170 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002171 for queue_entry in hosts_queue:
2172 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002173 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002174 queue_entry.status))
2175
2176 summary_text = "\n".join(summary_text)
2177 status_counts = models.Job.objects.get_status_counts(
2178 [self.job.id])[self.job.id]
2179 status = ', '.join('%d %s' % (count, status) for status, count
2180 in status_counts.iteritems())
2181
2182 subject = 'Autotest: Job ID: %s "%s" %s' % (
2183 self.job.id, self.job.name, status)
2184 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2185 self.job.id, self.job.name, status, self._view_job_url(),
2186 summary_text)
showard170873e2009-01-07 00:22:26 +00002187 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002188
2189
showard89f84db2009-03-12 20:39:13 +00002190 def run(self, assigned_host=None):
2191 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002192 assert assigned_host
2193 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002194 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002195
showardb18134f2009-03-20 20:52:18 +00002196 logging.info("%s/%s/%s scheduled on %s, status=%s",
2197 self.job.name, self.meta_host, self.atomic_group_id,
2198 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002199
jadmanski0afbb632008-06-06 21:10:57 +00002200 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002201
showard6ae5ea92009-02-25 00:11:51 +00002202
jadmanski0afbb632008-06-06 21:10:57 +00002203 def requeue(self):
2204 self.set_status('Queued')
showardde634ee2009-01-30 01:44:24 +00002205 # verify/cleanup failure sets the execution subdir, so reset it here
2206 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002207 if self.meta_host:
2208 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002209
2210
jadmanski0afbb632008-06-06 21:10:57 +00002211 def handle_host_failure(self):
2212 """\
2213 Called when this queue entry's host has failed verification and
2214 repair.
2215 """
2216 assert not self.meta_host
2217 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002218 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002219
2220
jadmanskif7fa2cc2008-10-01 14:13:23 +00002221 @property
2222 def aborted_by(self):
2223 self._load_abort_info()
2224 return self._aborted_by
2225
2226
2227 @property
2228 def aborted_on(self):
2229 self._load_abort_info()
2230 return self._aborted_on
2231
2232
2233 def _load_abort_info(self):
2234 """ Fetch info about who aborted the job. """
2235 if hasattr(self, "_aborted_by"):
2236 return
2237 rows = _db.execute("""
2238 SELECT users.login, aborted_host_queue_entries.aborted_on
2239 FROM aborted_host_queue_entries
2240 INNER JOIN users
2241 ON users.id = aborted_host_queue_entries.aborted_by_id
2242 WHERE aborted_host_queue_entries.queue_entry_id = %s
2243 """, (self.id,))
2244 if rows:
2245 self._aborted_by, self._aborted_on = rows[0]
2246 else:
2247 self._aborted_by = self._aborted_on = None
2248
2249
showardb2e2c322008-10-14 17:33:55 +00002250 def on_pending(self):
2251 """
2252 Called when an entry in a synchronous job has passed verify. If the
2253 job is ready to run, returns an agent to run the job. Returns None
2254 otherwise.
2255 """
2256 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002257 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002258 if self.job.is_ready():
2259 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002260 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002261 return None
2262
2263
showard170873e2009-01-07 00:22:26 +00002264 def abort(self, dispatcher, agents_to_abort=[]):
showard1be97432008-10-17 15:30:45 +00002265 host = self.get_host()
showard9d9ffd52008-11-09 23:14:35 +00002266 if self.active and host:
showard170873e2009-01-07 00:22:26 +00002267 dispatcher.add_agent(Agent(tasks=host.reverify_tasks()))
showard1be97432008-10-17 15:30:45 +00002268
showard170873e2009-01-07 00:22:26 +00002269 abort_task = AbortTask(self, agents_to_abort)
showard1be97432008-10-17 15:30:45 +00002270 self.set_status('Aborting')
showard170873e2009-01-07 00:22:26 +00002271 dispatcher.add_agent(Agent(tasks=[abort_task], num_processes=0))
2272
2273 def execution_tag(self):
2274 assert self.execution_subdir
2275 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002276
2277
mbligh36768f02008-02-22 18:28:33 +00002278class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002279 _table_name = 'jobs'
2280 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2281 'control_type', 'created_on', 'synch_count', 'timeout',
2282 'run_verify', 'email_list', 'reboot_before', 'reboot_after')
2283
2284
showarda3c58572009-03-12 20:36:59 +00002285 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002286 assert id or row
showarda3c58572009-03-12 20:36:59 +00002287 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002288
mblighe2586682008-02-29 22:45:46 +00002289
jadmanski0afbb632008-06-06 21:10:57 +00002290 def is_server_job(self):
2291 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002292
2293
showard170873e2009-01-07 00:22:26 +00002294 def tag(self):
2295 return "%s-%s" % (self.id, self.owner)
2296
2297
jadmanski0afbb632008-06-06 21:10:57 +00002298 def get_host_queue_entries(self):
2299 rows = _db.execute("""
2300 SELECT * FROM host_queue_entries
2301 WHERE job_id= %s
2302 """, (self.id,))
2303 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002304
jadmanski0afbb632008-06-06 21:10:57 +00002305 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002306
jadmanski0afbb632008-06-06 21:10:57 +00002307 return entries
mbligh36768f02008-02-22 18:28:33 +00002308
2309
jadmanski0afbb632008-06-06 21:10:57 +00002310 def set_status(self, status, update_queues=False):
2311 self.update_field('status',status)
2312
2313 if update_queues:
2314 for queue_entry in self.get_host_queue_entries():
2315 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002316
2317
jadmanski0afbb632008-06-06 21:10:57 +00002318 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002319 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2320 status='Pending')
2321 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002322
2323
jadmanski0afbb632008-06-06 21:10:57 +00002324 def num_machines(self, clause = None):
2325 sql = "job_id=%s" % self.id
2326 if clause:
2327 sql += " AND (%s)" % clause
2328 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002329
2330
jadmanski0afbb632008-06-06 21:10:57 +00002331 def num_queued(self):
2332 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002333
2334
jadmanski0afbb632008-06-06 21:10:57 +00002335 def num_active(self):
2336 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002337
2338
jadmanski0afbb632008-06-06 21:10:57 +00002339 def num_complete(self):
2340 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002341
2342
jadmanski0afbb632008-06-06 21:10:57 +00002343 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002344 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002345
mbligh36768f02008-02-22 18:28:33 +00002346
showard6bb7c292009-01-30 01:44:51 +00002347 def _not_yet_run_entries(self, include_verifying=True):
2348 statuses = [models.HostQueueEntry.Status.QUEUED,
2349 models.HostQueueEntry.Status.PENDING]
2350 if include_verifying:
2351 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2352 return models.HostQueueEntry.objects.filter(job=self.id,
2353 status__in=statuses)
2354
2355
2356 def _stop_all_entries(self):
2357 entries_to_stop = self._not_yet_run_entries(
2358 include_verifying=False)
2359 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002360 assert not child_entry.complete, (
2361 '%s status=%s, active=%s, complete=%s' %
2362 (child_entry.id, child_entry.status, child_entry.active,
2363 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002364 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2365 child_entry.host.status = models.Host.Status.READY
2366 child_entry.host.save()
2367 child_entry.status = models.HostQueueEntry.Status.STOPPED
2368 child_entry.save()
2369
showard2bab8f42008-11-12 18:15:22 +00002370 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002371 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002372 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002373 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002374
2375
jadmanski0afbb632008-06-06 21:10:57 +00002376 def write_to_machines_file(self, queue_entry):
2377 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002378 file_path = os.path.join(self.tag(), '.machines')
2379 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002380
2381
showard2bab8f42008-11-12 18:15:22 +00002382 def _next_group_name(self):
2383 query = models.HostQueueEntry.objects.filter(
2384 job=self.id).values('execution_subdir').distinct()
2385 subdirs = (entry['execution_subdir'] for entry in query)
2386 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2387 ids = [int(match.group(1)) for match in groups if match]
2388 if ids:
2389 next_id = max(ids) + 1
2390 else:
2391 next_id = 0
2392 return "group%d" % next_id
2393
2394
showard170873e2009-01-07 00:22:26 +00002395 def _write_control_file(self, execution_tag):
2396 control_path = _drone_manager.attach_file_to_execution(
2397 execution_tag, self.control_file)
2398 return control_path
mbligh36768f02008-02-22 18:28:33 +00002399
showardb2e2c322008-10-14 17:33:55 +00002400
showard2bab8f42008-11-12 18:15:22 +00002401 def get_group_entries(self, queue_entry_from_group):
2402 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002403 return list(HostQueueEntry.fetch(
2404 where='job_id=%s AND execution_subdir=%s',
2405 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002406
2407
showardb2e2c322008-10-14 17:33:55 +00002408 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002409 assert queue_entries
2410 execution_tag = queue_entries[0].execution_tag()
2411 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002412 hostnames = ','.join([entry.get_host().hostname
2413 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002414
showard87ba02a2009-04-20 19:37:32 +00002415 params = _autoserv_command_line(
2416 hostnames, execution_tag,
2417 ['-P', execution_tag, '-n',
2418 _drone_manager.absolute_path(control_path)],
2419 job=self)
mbligh36768f02008-02-22 18:28:33 +00002420
jadmanski0afbb632008-06-06 21:10:57 +00002421 if not self.is_server_job():
2422 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002423
showardb2e2c322008-10-14 17:33:55 +00002424 return params
mblighe2586682008-02-29 22:45:46 +00002425
mbligh36768f02008-02-22 18:28:33 +00002426
showardc9ae1782009-01-30 01:42:37 +00002427 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002428 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002429 return True
showard0fc38302008-10-23 00:44:07 +00002430 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002431 return queue_entry.get_host().dirty
2432 return False
showard21baa452008-10-21 00:08:39 +00002433
showardc9ae1782009-01-30 01:42:37 +00002434
2435 def _should_run_verify(self, queue_entry):
2436 do_not_verify = (queue_entry.host.protection ==
2437 host_protections.Protection.DO_NOT_VERIFY)
2438 if do_not_verify:
2439 return False
2440 return self.run_verify
2441
2442
2443 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002444 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002445 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002446 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002447 if self._should_run_verify(queue_entry):
2448 tasks.append(VerifyTask(queue_entry=queue_entry))
2449 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002450 return tasks
2451
2452
showard2bab8f42008-11-12 18:15:22 +00002453 def _assign_new_group(self, queue_entries):
2454 if len(queue_entries) == 1:
2455 group_name = queue_entries[0].get_host().hostname
2456 else:
2457 group_name = self._next_group_name()
showardb18134f2009-03-20 20:52:18 +00002458 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002459 self.id, [entry.host.hostname for entry in queue_entries],
2460 group_name)
2461
2462 for queue_entry in queue_entries:
2463 queue_entry.set_execution_subdir(group_name)
2464
2465
2466 def _choose_group_to_run(self, include_queue_entry):
2467 chosen_entries = [include_queue_entry]
2468
2469 num_entries_needed = self.synch_count - 1
2470 if num_entries_needed > 0:
2471 pending_entries = HostQueueEntry.fetch(
2472 where='job_id = %s AND status = "Pending" AND id != %s',
2473 params=(self.id, include_queue_entry.id))
2474 chosen_entries += list(pending_entries)[:num_entries_needed]
2475
2476 self._assign_new_group(chosen_entries)
2477 return chosen_entries
2478
2479
2480 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002481 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002482 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2483 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002484
showard2bab8f42008-11-12 18:15:22 +00002485 queue_entries = self._choose_group_to_run(queue_entry)
2486 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002487
2488
2489 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002490 for queue_entry in queue_entries:
2491 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002492 params = self._get_autoserv_params(queue_entries)
2493 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2494 cmd=params)
2495 tasks = initial_tasks + [queue_task]
2496 entry_ids = [entry.id for entry in queue_entries]
2497
showard170873e2009-01-07 00:22:26 +00002498 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002499
2500
mbligh36768f02008-02-22 18:28:33 +00002501if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002502 main()