blob: 388dfe5498a959e70e171e7558521a442d24b90d [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard89f84db2009-03-12 20:39:13 +00008import datetime, errno, optparse, os, pwd, Queue, random, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showarda3c58572009-03-12 20:36:59 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showard2bab8f42008-11-12 18:15:22 +000015from autotest_lib.client.common_lib import host_protections, utils, debug
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000017from autotest_lib.frontend.afe import models
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
showardd1ee1dd2009-01-07 21:33:08 +000019from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000020
mblighb090f142008-02-27 21:33:46 +000021
mbligh36768f02008-02-22 18:28:33 +000022RESULTS_DIR = '.'
23AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000024DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000025
26AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
27
28if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000029 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000030AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
31AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
32
33if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000034 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000035
mbligh90a549d2008-03-25 23:52:34 +000036# how long to wait for autoserv to write a pidfile
37PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000038
showard35162b02009-03-03 02:17:30 +000039# error message to leave in results dir when an autoserv process disappears
40# mysteriously
41_LOST_PROCESS_ERROR = """\
42Autoserv failed abnormally during execution for this job, probably due to a
43system error on the Autotest server. Full results may not be available. Sorry.
44"""
45
mbligh6f8bab42008-02-29 22:45:14 +000046_db = None
mbligh36768f02008-02-22 18:28:33 +000047_shutdown = False
showard170873e2009-01-07 00:22:26 +000048_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
49_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000050_testing_mode = False
showard542e8402008-09-19 20:16:18 +000051_base_url = None
showardc85c21b2008-11-24 22:17:37 +000052_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000053_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000054
55
56def main():
jadmanski0afbb632008-06-06 21:10:57 +000057 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000058
jadmanski0afbb632008-06-06 21:10:57 +000059 parser = optparse.OptionParser(usage)
60 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
61 action='store_true')
62 parser.add_option('--logfile', help='Set a log file that all stdout ' +
63 'should be redirected to. Stderr will go to this ' +
64 'file + ".err"')
65 parser.add_option('--test', help='Indicate that scheduler is under ' +
66 'test and should use dummy autoserv and no parsing',
67 action='store_true')
68 (options, args) = parser.parse_args()
69 if len(args) != 1:
70 parser.print_usage()
71 return
mbligh36768f02008-02-22 18:28:33 +000072
jadmanski0afbb632008-06-06 21:10:57 +000073 global RESULTS_DIR
74 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +000075
showardcca334f2009-03-12 20:38:34 +000076 # Change the cwd while running to avoid issues incase we were launched from
77 # somewhere odd (such as a random NFS home directory of the person running
78 # sudo to launch us as the appropriate user).
79 os.chdir(RESULTS_DIR)
80
jadmanski0afbb632008-06-06 21:10:57 +000081 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +000082 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
83 "notify_email_statuses",
84 default='')
showardc85c21b2008-11-24 22:17:37 +000085 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +000086 _notify_email_statuses = [status for status in
87 re.split(r'[\s,;:]', notify_statuses_list.lower())
88 if status]
showardc85c21b2008-11-24 22:17:37 +000089
jadmanski0afbb632008-06-06 21:10:57 +000090 if options.test:
91 global _autoserv_path
92 _autoserv_path = 'autoserv_dummy'
93 global _testing_mode
94 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000095
mbligh37eceaa2008-12-15 22:56:37 +000096 # AUTOTEST_WEB.base_url is still a supported config option as some people
97 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +000098 global _base_url
showard170873e2009-01-07 00:22:26 +000099 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
100 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000101 if config_base_url:
102 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000103 else:
mbligh37eceaa2008-12-15 22:56:37 +0000104 # For the common case of everything running on a single server you
105 # can just set the hostname in a single place in the config file.
106 server_name = c.get_config_value('SERVER', 'hostname')
107 if not server_name:
108 print 'Error: [SERVER] hostname missing from the config file.'
109 sys.exit(1)
110 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000111
showardc5afc462009-01-13 00:09:39 +0000112 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000113 server.start()
114
jadmanski0afbb632008-06-06 21:10:57 +0000115 try:
showardc5afc462009-01-13 00:09:39 +0000116 init(options.logfile)
117 dispatcher = Dispatcher()
118 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
119
jadmanski0afbb632008-06-06 21:10:57 +0000120 while not _shutdown:
121 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000122 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000123 except:
showard170873e2009-01-07 00:22:26 +0000124 email_manager.manager.log_stacktrace(
125 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000126
showard170873e2009-01-07 00:22:26 +0000127 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000128 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000129 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000130 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000131
132
133def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000134 global _shutdown
135 _shutdown = True
136 print "Shutdown request received."
mbligh36768f02008-02-22 18:28:33 +0000137
138
139def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000140 if logfile:
141 enable_logging(logfile)
142 print "%s> dispatcher starting" % time.strftime("%X %x")
143 print "My PID is %d" % os.getpid()
mbligh36768f02008-02-22 18:28:33 +0000144
showardb1e51872008-10-07 11:08:18 +0000145 if _testing_mode:
146 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000147 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000148
jadmanski0afbb632008-06-06 21:10:57 +0000149 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
150 global _db
showard170873e2009-01-07 00:22:26 +0000151 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000152 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000153
showardfa8629c2008-11-04 16:51:23 +0000154 # ensure Django connection is in autocommit
155 setup_django_environment.enable_autocommit()
156
showard2bab8f42008-11-12 18:15:22 +0000157 debug.configure('scheduler', format_string='%(message)s')
showard67831ae2009-01-16 03:07:38 +0000158 debug.get_logger().setLevel(logging.INFO)
showard2bab8f42008-11-12 18:15:22 +0000159
jadmanski0afbb632008-06-06 21:10:57 +0000160 print "Setting signal handler"
161 signal.signal(signal.SIGINT, handle_sigint)
162
showardd1ee1dd2009-01-07 21:33:08 +0000163 drones = global_config.global_config.get_config_value(
164 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
165 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000166 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000167 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000168 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
169
jadmanski0afbb632008-06-06 21:10:57 +0000170 print "Connected! Running..."
mbligh36768f02008-02-22 18:28:33 +0000171
172
173def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000174 out_file = logfile
175 err_file = "%s.err" % logfile
176 print "Enabling logging to %s (%s)" % (out_file, err_file)
177 out_fd = open(out_file, "a", buffering=0)
178 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000179
jadmanski0afbb632008-06-06 21:10:57 +0000180 os.dup2(out_fd.fileno(), sys.stdout.fileno())
181 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000182
jadmanski0afbb632008-06-06 21:10:57 +0000183 sys.stdout = out_fd
184 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000185
186
mblighd5c95802008-03-05 00:33:46 +0000187def queue_entries_to_abort():
jadmanski0afbb632008-06-06 21:10:57 +0000188 rows = _db.execute("""
189 SELECT * FROM host_queue_entries WHERE status='Abort';
190 """)
showard2bab8f42008-11-12 18:15:22 +0000191
jadmanski0afbb632008-06-06 21:10:57 +0000192 qe = [HostQueueEntry(row=i) for i in rows]
193 return qe
mbligh36768f02008-02-22 18:28:33 +0000194
showard7cf9a9b2008-05-15 21:15:52 +0000195
showard89f84db2009-03-12 20:39:13 +0000196class SchedulerError(Exception):
197 """Raised by HostScheduler when an inconsistent state occurs."""
198
199
showard63a34772008-08-18 19:32:50 +0000200class HostScheduler(object):
201 def _get_ready_hosts(self):
202 # avoid any host with a currently active queue entry against it
203 hosts = Host.fetch(
204 joins='LEFT JOIN host_queue_entries AS active_hqe '
205 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000206 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000207 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000208 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000209 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
210 return dict((host.id, host) for host in hosts)
211
212
213 @staticmethod
214 def _get_sql_id_list(id_list):
215 return ','.join(str(item_id) for item_id in id_list)
216
217
218 @classmethod
showard989f25d2008-10-01 11:38:11 +0000219 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000220 if not id_list:
221 return {}
showard63a34772008-08-18 19:32:50 +0000222 query %= cls._get_sql_id_list(id_list)
223 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000224 return cls._process_many2many_dict(rows, flip)
225
226
227 @staticmethod
228 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000229 result = {}
230 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000231 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000232 if flip:
233 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000234 result.setdefault(left_id, set()).add(right_id)
235 return result
236
237
238 @classmethod
239 def _get_job_acl_groups(cls, job_ids):
240 query = """
showardd9ac4452009-02-07 02:04:37 +0000241 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000242 FROM jobs
243 INNER JOIN users ON users.login = jobs.owner
244 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
245 WHERE jobs.id IN (%s)
246 """
247 return cls._get_many2many_dict(query, job_ids)
248
249
250 @classmethod
251 def _get_job_ineligible_hosts(cls, job_ids):
252 query = """
253 SELECT job_id, host_id
254 FROM ineligible_host_queues
255 WHERE job_id IN (%s)
256 """
257 return cls._get_many2many_dict(query, job_ids)
258
259
260 @classmethod
showard989f25d2008-10-01 11:38:11 +0000261 def _get_job_dependencies(cls, job_ids):
262 query = """
263 SELECT job_id, label_id
264 FROM jobs_dependency_labels
265 WHERE job_id IN (%s)
266 """
267 return cls._get_many2many_dict(query, job_ids)
268
269
270 @classmethod
showard63a34772008-08-18 19:32:50 +0000271 def _get_host_acls(cls, host_ids):
272 query = """
showardd9ac4452009-02-07 02:04:37 +0000273 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000274 FROM acl_groups_hosts
275 WHERE host_id IN (%s)
276 """
277 return cls._get_many2many_dict(query, host_ids)
278
279
280 @classmethod
281 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000282 if not host_ids:
283 return {}, {}
showard63a34772008-08-18 19:32:50 +0000284 query = """
285 SELECT label_id, host_id
286 FROM hosts_labels
287 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000288 """ % cls._get_sql_id_list(host_ids)
289 rows = _db.execute(query)
290 labels_to_hosts = cls._process_many2many_dict(rows)
291 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
292 return labels_to_hosts, hosts_to_labels
293
294
295 @classmethod
296 def _get_labels(cls):
297 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000298
299
300 def refresh(self, pending_queue_entries):
301 self._hosts_available = self._get_ready_hosts()
302
303 relevant_jobs = [queue_entry.job_id
304 for queue_entry in pending_queue_entries]
305 self._job_acls = self._get_job_acl_groups(relevant_jobs)
306 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000307 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000308
309 host_ids = self._hosts_available.keys()
310 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000311 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
312
313 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000314
315
316 def _is_acl_accessible(self, host_id, queue_entry):
317 job_acls = self._job_acls.get(queue_entry.job_id, set())
318 host_acls = self._host_acls.get(host_id, set())
319 return len(host_acls.intersection(job_acls)) > 0
320
321
showard989f25d2008-10-01 11:38:11 +0000322 def _check_job_dependencies(self, job_dependencies, host_labels):
323 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000324 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000325
326
327 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
328 queue_entry):
showardade14e22009-01-26 22:38:32 +0000329 if not queue_entry.meta_host:
330 # bypass only_if_needed labels when a specific host is selected
331 return True
332
showard989f25d2008-10-01 11:38:11 +0000333 for label_id in host_labels:
334 label = self._labels[label_id]
335 if not label.only_if_needed:
336 # we don't care about non-only_if_needed labels
337 continue
338 if queue_entry.meta_host == label_id:
339 # if the label was requested in a metahost it's OK
340 continue
341 if label_id not in job_dependencies:
342 return False
343 return True
344
345
showard89f84db2009-03-12 20:39:13 +0000346 def _check_atomic_group_labels(self, host_labels, queue_entry):
347 """
348 Determine if the given HostQueueEntry's atomic group settings are okay
349 to schedule on a host with the given labels.
350
351 @param host_labels - A list of label ids that the host has.
352 @param queue_entry - The HostQueueEntry being considered for the host.
353
354 @returns True if atomic group settings are okay, False otherwise.
355 """
356 return (self._get_host_atomic_group_id(host_labels) ==
357 queue_entry.atomic_group_id)
358
359
360 def _get_host_atomic_group_id(self, host_labels):
361 """
362 Return the atomic group label id for a host with the given set of
363 labels if any, or None otherwise. Raises an exception if more than
364 one atomic group are found in the set of labels.
365
366 @param host_labels - A list of label ids that the host has.
367
368 @returns The id of the atomic group found on a label in host_labels
369 or None if no atomic group label is found.
370 @raises SchedulerError - If more than one atomic group label is found.
371 """
372 atomic_ids = [self._labels[label_id].atomic_group_id
373 for label_id in host_labels
374 if self._labels[label_id].atomic_group_id is not None]
375 if not atomic_ids:
376 return None
377 if len(atomic_ids) > 1:
378 raise SchedulerError('More than one atomic label on host.')
379 return atomic_ids[0]
380
381
382 def _get_atomic_group_labels(self, atomic_group_id):
383 """
384 Lookup the label ids that an atomic_group is associated with.
385
386 @param atomic_group_id - The id of the AtomicGroup to look up.
387
388 @returns A generator yeilding Label ids for this atomic group.
389 """
390 return (id for id, label in self._labels.iteritems()
391 if label.atomic_group_id == atomic_group_id
392 and not label.invalid)
393
394
395 def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
396 """
397 @param group_hosts - A sequence of Host ids to test for usability
398 and eligibility against the Job associated with queue_entry.
399 @param queue_entry - The HostQueueEntry that these hosts are being
400 tested for eligibility against.
401
402 @returns A subset of group_hosts Host ids that are eligible for the
403 supplied queue_entry.
404 """
405 return set(host_id for host_id in group_hosts
406 if self._is_host_usable(host_id)
407 and self._is_host_eligible_for_job(host_id, queue_entry))
408
409
showard989f25d2008-10-01 11:38:11 +0000410 def _is_host_eligible_for_job(self, host_id, queue_entry):
411 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
412 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000413
showard89f84db2009-03-12 20:39:13 +0000414 return (self._is_acl_accessible(host_id, queue_entry) and
415 self._check_job_dependencies(job_dependencies, host_labels) and
416 self._check_only_if_needed_labels(
417 job_dependencies, host_labels, queue_entry) and
418 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000419
420
showard63a34772008-08-18 19:32:50 +0000421 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000422 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000423 return None
424 return self._hosts_available.pop(queue_entry.host_id, None)
425
426
427 def _is_host_usable(self, host_id):
428 if host_id not in self._hosts_available:
429 # host was already used during this scheduling cycle
430 return False
431 if self._hosts_available[host_id].invalid:
432 # Invalid hosts cannot be used for metahosts. They're included in
433 # the original query because they can be used by non-metahosts.
434 return False
435 return True
436
437
438 def _schedule_metahost(self, queue_entry):
439 label_id = queue_entry.meta_host
440 hosts_in_label = self._label_hosts.get(label_id, set())
441 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
442 set())
443
444 # must iterate over a copy so we can mutate the original while iterating
445 for host_id in list(hosts_in_label):
446 if not self._is_host_usable(host_id):
447 hosts_in_label.remove(host_id)
448 continue
449 if host_id in ineligible_host_ids:
450 continue
showard989f25d2008-10-01 11:38:11 +0000451 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000452 continue
453
showard89f84db2009-03-12 20:39:13 +0000454 # Remove the host from our cached internal state before returning
455 # the host object.
showard63a34772008-08-18 19:32:50 +0000456 hosts_in_label.remove(host_id)
457 return self._hosts_available.pop(host_id)
458 return None
459
460
461 def find_eligible_host(self, queue_entry):
462 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000463 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000464 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000465 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000466 return self._schedule_metahost(queue_entry)
467
468
showard89f84db2009-03-12 20:39:13 +0000469 def find_eligible_atomic_group(self, queue_entry):
470 """
471 Given an atomic group host queue entry, locate an appropriate group
472 of hosts for the associated job to run on.
473
474 The caller is responsible for creating new HQEs for the additional
475 hosts returned in order to run the actual job on them.
476
477 @returns A list of Host instances in a ready state to satisfy this
478 atomic group scheduling. Hosts will all belong to the same
479 atomic group label as specified by the queue_entry.
480 An empty list will be returned if no suitable atomic
481 group could be found.
482
483 TODO(gps): what is responsible for kicking off any attempted repairs on
484 a group of hosts? not this function, but something needs to. We do
485 not communicate that reason for returning [] outside of here...
486 For now, we'll just be unschedulable if enough hosts within one group
487 enter Repair Failed state.
488 """
489 assert queue_entry.atomic_group_id is not None
490 job = queue_entry.job
491 assert job.synch_count and job.synch_count > 0
492 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
493 if job.synch_count > atomic_group.max_number_of_machines:
494 # Such a Job and HostQueueEntry should never be possible to
495 # create using the frontend. Regardless, we can't process it.
496 # Abort it immediately and log an error on the scheduler.
497 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
498 bprint('Error: job %d synch_count=%d > requested atomic_group %d '
499 'max_number_of_machines=%d. Aborted host_queue_entry %d.' %
500 (job.id, job.synch_count, atomic_group.id,
501 atomic_group.max_number_of_machines, queue_entry.id))
502 return []
503 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
504 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
505 set())
506
507 # Look in each label associated with atomic_group until we find one with
508 # enough hosts to satisfy the job.
509 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
510 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
511 if queue_entry.meta_host is not None:
512 # If we have a metahost label, only allow its hosts.
513 group_hosts.intersection_update(hosts_in_label)
514 group_hosts -= ineligible_host_ids
515 eligible_hosts_in_group = self._get_eligible_hosts_in_group(
516 group_hosts, queue_entry)
517
518 # Job.synch_count is treated as "minimum synch count" when
519 # scheduling for an atomic group of hosts. The atomic group
520 # number of machines is the maximum to pick out of a single
521 # atomic group label for scheduling at one time.
522 min_hosts = job.synch_count
523 max_hosts = atomic_group.max_number_of_machines
524
525 if len(eligible_hosts_in_group) < min_hosts:
526 # Not enough eligible hosts in this atomic group label.
527 continue
528
529 # Limit ourselves to scheduling the atomic group size.
530 if len(eligible_hosts_in_group) > max_hosts:
531 eligible_hosts_in_group = random.sample(
532 eligible_hosts_in_group, max_hosts)
533
534 # Remove the selected hosts from our cached internal state
535 # of available hosts in order to return the Host objects.
536 host_list = []
537 for host_id in eligible_hosts_in_group:
538 hosts_in_label.discard(host_id)
539 host_list.append(self._hosts_available.pop(host_id))
540 return host_list
541
542 return []
543
544
showard170873e2009-01-07 00:22:26 +0000545class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000546 def __init__(self):
547 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000548 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000549 self._host_scheduler = HostScheduler()
showard170873e2009-01-07 00:22:26 +0000550 self._host_agents = {}
551 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000552
mbligh36768f02008-02-22 18:28:33 +0000553
jadmanski0afbb632008-06-06 21:10:57 +0000554 def do_initial_recovery(self, recover_hosts=True):
555 # always recover processes
556 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000557
jadmanski0afbb632008-06-06 21:10:57 +0000558 if recover_hosts:
559 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000560
561
jadmanski0afbb632008-06-06 21:10:57 +0000562 def tick(self):
showard170873e2009-01-07 00:22:26 +0000563 _drone_manager.refresh()
showarda3ab0d52008-11-03 19:03:47 +0000564 self._run_cleanup_maybe()
jadmanski0afbb632008-06-06 21:10:57 +0000565 self._find_aborting()
566 self._schedule_new_jobs()
567 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000568 _drone_manager.execute_actions()
569 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000570
showard97aed502008-11-04 02:01:24 +0000571
showarda3ab0d52008-11-03 19:03:47 +0000572 def _run_cleanup_maybe(self):
showardd1ee1dd2009-01-07 21:33:08 +0000573 should_cleanup = (self._last_clean_time +
574 scheduler_config.config.clean_interval * 60 <
575 time.time())
576 if should_cleanup:
showarda3ab0d52008-11-03 19:03:47 +0000577 print 'Running cleanup'
578 self._abort_timed_out_jobs()
579 self._abort_jobs_past_synch_start_timeout()
580 self._clear_inactive_blocks()
showardfa8629c2008-11-04 16:51:23 +0000581 self._check_for_db_inconsistencies()
showarda3ab0d52008-11-03 19:03:47 +0000582 self._last_clean_time = time.time()
583
mbligh36768f02008-02-22 18:28:33 +0000584
showard170873e2009-01-07 00:22:26 +0000585 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
586 for object_id in object_ids:
587 agent_dict.setdefault(object_id, set()).add(agent)
588
589
590 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
591 for object_id in object_ids:
592 assert object_id in agent_dict
593 agent_dict[object_id].remove(agent)
594
595
jadmanski0afbb632008-06-06 21:10:57 +0000596 def add_agent(self, agent):
597 self._agents.append(agent)
598 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000599 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
600 self._register_agent_for_ids(self._queue_entry_agents,
601 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000602
showard170873e2009-01-07 00:22:26 +0000603
604 def get_agents_for_entry(self, queue_entry):
605 """
606 Find agents corresponding to the specified queue_entry.
607 """
608 return self._queue_entry_agents.get(queue_entry.id, set())
609
610
611 def host_has_agent(self, host):
612 """
613 Determine if there is currently an Agent present using this host.
614 """
615 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000616
617
jadmanski0afbb632008-06-06 21:10:57 +0000618 def remove_agent(self, agent):
619 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000620 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
621 agent)
622 self._unregister_agent_for_ids(self._queue_entry_agents,
623 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000624
625
showard4c5374f2008-09-04 17:02:56 +0000626 def num_running_processes(self):
627 return sum(agent.num_processes for agent in self._agents
628 if agent.is_running())
mblighbb421852008-03-11 22:36:16 +0000629
630
showard170873e2009-01-07 00:22:26 +0000631 def _extract_execution_tag(self, command_line):
632 match = re.match(r'.* -P (\S+) ', command_line)
633 if not match:
634 return None
635 return match.group(1)
mblighbb421852008-03-11 22:36:16 +0000636
637
showard2bab8f42008-11-12 18:15:22 +0000638 def _recover_queue_entries(self, queue_entries, run_monitor):
639 assert len(queue_entries) > 0
showard2bab8f42008-11-12 18:15:22 +0000640 queue_task = RecoveryQueueTask(job=queue_entries[0].job,
641 queue_entries=queue_entries,
642 run_monitor=run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +0000643 self.add_agent(Agent(tasks=[queue_task],
showard170873e2009-01-07 00:22:26 +0000644 num_processes=len(queue_entries)))
mblighbb421852008-03-11 22:36:16 +0000645
646
jadmanski0afbb632008-06-06 21:10:57 +0000647 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000648 self._register_pidfiles()
649 _drone_manager.refresh()
650 self._recover_running_entries()
651 self._recover_aborting_entries()
652 self._requeue_other_active_entries()
653 self._recover_parsing_entries()
654 self._reverify_remaining_hosts()
655 # reinitialize drones after killing orphaned processes, since they can
656 # leave around files when they die
657 _drone_manager.execute_actions()
658 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000659
showard170873e2009-01-07 00:22:26 +0000660
661 def _register_pidfiles(self):
662 # during recovery we may need to read pidfiles for both running and
663 # parsing entries
664 queue_entries = HostQueueEntry.fetch(
665 where="status IN ('Running', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000666 for queue_entry in queue_entries:
showard170873e2009-01-07 00:22:26 +0000667 pidfile_id = _drone_manager.get_pidfile_id_from(
668 queue_entry.execution_tag())
669 _drone_manager.register_pidfile(pidfile_id)
670
671
672 def _recover_running_entries(self):
673 orphans = _drone_manager.get_orphaned_autoserv_processes()
674
675 queue_entries = HostQueueEntry.fetch(where="status = 'Running'")
676 requeue_entries = []
677 for queue_entry in queue_entries:
678 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000679 # synchronous job we've already recovered
680 continue
showard170873e2009-01-07 00:22:26 +0000681 execution_tag = queue_entry.execution_tag()
682 run_monitor = PidfileRunMonitor()
683 run_monitor.attach_to_existing_process(execution_tag)
684 if not run_monitor.has_process():
685 # autoserv apparently never got run, so let it get requeued
686 continue
showarde788ea62008-11-17 21:02:47 +0000687 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000688 print 'Recovering %s (process %s)' % (
689 ', '.join(str(entry) for entry in queue_entries),
690 run_monitor.get_process())
showard2bab8f42008-11-12 18:15:22 +0000691 self._recover_queue_entries(queue_entries, run_monitor)
showard170873e2009-01-07 00:22:26 +0000692 orphans.pop(execution_tag, None)
mbligh90a549d2008-03-25 23:52:34 +0000693
jadmanski0afbb632008-06-06 21:10:57 +0000694 # now kill any remaining autoserv processes
showard170873e2009-01-07 00:22:26 +0000695 for process in orphans.itervalues():
696 print 'Killing orphan %s' % process
697 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000698
showard170873e2009-01-07 00:22:26 +0000699
700 def _recover_aborting_entries(self):
701 queue_entries = HostQueueEntry.fetch(
702 where='status IN ("Abort", "Aborting")')
jadmanski0afbb632008-06-06 21:10:57 +0000703 for queue_entry in queue_entries:
showard170873e2009-01-07 00:22:26 +0000704 print 'Recovering aborting QE %s' % queue_entry
705 agent = queue_entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000706
showard97aed502008-11-04 02:01:24 +0000707
showard170873e2009-01-07 00:22:26 +0000708 def _requeue_other_active_entries(self):
709 queue_entries = HostQueueEntry.fetch(
710 where='active AND NOT complete AND status != "Pending"')
711 for queue_entry in queue_entries:
712 if self.get_agents_for_entry(queue_entry):
713 # entry has already been recovered
714 continue
715 print 'Requeuing active QE %s (status=%s)' % (queue_entry,
716 queue_entry.status)
717 if queue_entry.host:
718 tasks = queue_entry.host.reverify_tasks()
719 self.add_agent(Agent(tasks))
720 agent = queue_entry.requeue()
721
722
723 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000724 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000725 self._reverify_hosts_where("""(status = 'Repairing' OR
726 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000727 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000728
showard170873e2009-01-07 00:22:26 +0000729 # recover "Running" hosts with no active queue entries, although this
730 # should never happen
731 message = ('Recovering running host %s - this probably indicates a '
732 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000733 self._reverify_hosts_where("""status = 'Running' AND
734 id NOT IN (SELECT host_id
735 FROM host_queue_entries
736 WHERE active)""",
737 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000738
739
jadmanski0afbb632008-06-06 21:10:57 +0000740 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000741 print_message='Reverifying host %s'):
742 full_where='locked = 0 AND invalid = 0 AND ' + where
743 for host in Host.fetch(where=full_where):
744 if self.host_has_agent(host):
745 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000746 continue
showard170873e2009-01-07 00:22:26 +0000747 if print_message:
jadmanski0afbb632008-06-06 21:10:57 +0000748 print print_message % host.hostname
showard170873e2009-01-07 00:22:26 +0000749 tasks = host.reverify_tasks()
750 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000751
752
showard97aed502008-11-04 02:01:24 +0000753 def _recover_parsing_entries(self):
showard2bab8f42008-11-12 18:15:22 +0000754 recovered_entry_ids = set()
showard97aed502008-11-04 02:01:24 +0000755 for entry in HostQueueEntry.fetch(where='status = "Parsing"'):
showard2bab8f42008-11-12 18:15:22 +0000756 if entry.id in recovered_entry_ids:
757 continue
758 queue_entries = entry.job.get_group_entries(entry)
showard170873e2009-01-07 00:22:26 +0000759 recovered_entry_ids = recovered_entry_ids.union(
760 entry.id for entry in queue_entries)
761 print 'Recovering parsing entries %s' % (
762 ', '.join(str(entry) for entry in queue_entries))
showard97aed502008-11-04 02:01:24 +0000763
764 reparse_task = FinalReparseTask(queue_entries)
showard170873e2009-01-07 00:22:26 +0000765 self.add_agent(Agent([reparse_task], num_processes=0))
showard97aed502008-11-04 02:01:24 +0000766
767
jadmanski0afbb632008-06-06 21:10:57 +0000768 def _recover_hosts(self):
769 # recover "Repair Failed" hosts
770 message = 'Reverifying dead host %s'
771 self._reverify_hosts_where("status = 'Repair Failed'",
772 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000773
774
showard3bb499f2008-07-03 19:42:20 +0000775 def _abort_timed_out_jobs(self):
776 """
777 Aborts all jobs that have timed out and not completed
778 """
showarda3ab0d52008-11-03 19:03:47 +0000779 query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
780 where=['created_on + INTERVAL timeout HOUR < NOW()'])
781 for job in query.distinct():
782 print 'Aborting job %d due to job timeout' % job.id
783 job.abort(None)
showard3bb499f2008-07-03 19:42:20 +0000784
785
showard98863972008-10-29 21:14:56 +0000786 def _abort_jobs_past_synch_start_timeout(self):
787 """
788 Abort synchronous jobs that are past the start timeout (from global
789 config) and are holding a machine that's in everyone.
790 """
791 timeout_delta = datetime.timedelta(
showardd1ee1dd2009-01-07 21:33:08 +0000792 minutes=scheduler_config.config.synch_job_start_timeout_minutes)
showard98863972008-10-29 21:14:56 +0000793 timeout_start = datetime.datetime.now() - timeout_delta
794 query = models.Job.objects.filter(
showard98863972008-10-29 21:14:56 +0000795 created_on__lt=timeout_start,
796 hostqueueentry__status='Pending',
showardd9ac4452009-02-07 02:04:37 +0000797 hostqueueentry__host__aclgroup__name='Everyone')
showard98863972008-10-29 21:14:56 +0000798 for job in query.distinct():
799 print 'Aborting job %d due to start timeout' % job.id
showardff059d72008-12-03 18:18:53 +0000800 entries_to_abort = job.hostqueueentry_set.exclude(
801 status=models.HostQueueEntry.Status.RUNNING)
802 for queue_entry in entries_to_abort:
803 queue_entry.abort(None)
showard98863972008-10-29 21:14:56 +0000804
805
jadmanski0afbb632008-06-06 21:10:57 +0000806 def _clear_inactive_blocks(self):
807 """
808 Clear out blocks for all completed jobs.
809 """
810 # this would be simpler using NOT IN (subquery), but MySQL
811 # treats all IN subqueries as dependent, so this optimizes much
812 # better
813 _db.execute("""
814 DELETE ihq FROM ineligible_host_queues ihq
showard4eaaf522008-06-06 22:28:07 +0000815 LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries
jadmanski0afbb632008-06-06 21:10:57 +0000816 WHERE NOT complete) hqe
817 USING (job_id) WHERE hqe.job_id IS NULL""")
showard04c82c52008-05-29 19:38:12 +0000818
819
showardb95b1bd2008-08-15 18:11:04 +0000820 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000821 # prioritize by job priority, then non-metahost over metahost, then FIFO
822 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000823 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000824 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000825 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000826
827
showard89f84db2009-03-12 20:39:13 +0000828 def _refresh_pending_queue_entries(self):
829 """
830 Lookup the pending HostQueueEntries and call our HostScheduler
831 refresh() method given that list. Return the list.
832
833 @returns A list of pending HostQueueEntries sorted in priority order.
834 """
showard63a34772008-08-18 19:32:50 +0000835 queue_entries = self._get_pending_queue_entries()
836 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000837 return []
showardb95b1bd2008-08-15 18:11:04 +0000838
showard63a34772008-08-18 19:32:50 +0000839 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000840
showard89f84db2009-03-12 20:39:13 +0000841 return queue_entries
842
843
844 def _schedule_atomic_group(self, queue_entry):
845 """
846 Schedule the given queue_entry on an atomic group of hosts.
847
848 Returns immediately if there are insufficient available hosts.
849
850 Creates new HostQueueEntries based off of queue_entry for the
851 scheduled hosts and starts them all running.
852 """
853 # This is a virtual host queue entry representing an entire
854 # atomic group, find a group and schedule their hosts.
855 group_hosts = self._host_scheduler.find_eligible_atomic_group(
856 queue_entry)
857 if not group_hosts:
858 return
859 # The first assigned host uses the original HostQueueEntry
860 group_queue_entries = [queue_entry]
861 for assigned_host in group_hosts[1:]:
862 # Create a new HQE for every additional assigned_host.
863 new_hqe = HostQueueEntry.clone(queue_entry)
864 new_hqe.save()
865 group_queue_entries.append(new_hqe)
866 assert len(group_queue_entries) == len(group_hosts)
867 for queue_entry, host in itertools.izip(group_queue_entries,
868 group_hosts):
869 self._run_queue_entry(queue_entry, host)
870
871
872 def _schedule_new_jobs(self):
873 queue_entries = self._refresh_pending_queue_entries()
874 if not queue_entries:
875 return
876
showard63a34772008-08-18 19:32:50 +0000877 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000878 if (queue_entry.atomic_group_id is None or
879 queue_entry.host_id is not None):
880 assigned_host = self._host_scheduler.find_eligible_host(
881 queue_entry)
882 if assigned_host:
883 self._run_queue_entry(queue_entry, assigned_host)
884 else:
885 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000886
887
888 def _run_queue_entry(self, queue_entry, host):
889 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000890 # in some cases (synchronous jobs with run_verify=False), agent may be
891 # None
showard9976ce92008-10-15 20:28:13 +0000892 if agent:
893 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000894
895
jadmanski0afbb632008-06-06 21:10:57 +0000896 def _find_aborting(self):
jadmanski0afbb632008-06-06 21:10:57 +0000897 for entry in queue_entries_to_abort():
showard170873e2009-01-07 00:22:26 +0000898 agents_to_abort = list(self.get_agents_for_entry(entry))
showard1be97432008-10-17 15:30:45 +0000899 for agent in agents_to_abort:
900 self.remove_agent(agent)
901
showard170873e2009-01-07 00:22:26 +0000902 entry.abort(self, agents_to_abort)
jadmanski0afbb632008-06-06 21:10:57 +0000903
904
showard324bf812009-01-20 23:23:38 +0000905 def _can_start_agent(self, agent, num_started_this_cycle,
906 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000907 # always allow zero-process agents to run
908 if agent.num_processes == 0:
909 return True
910 # don't allow any nonzero-process agents to run after we've reached a
911 # limit (this avoids starvation of many-process agents)
912 if have_reached_limit:
913 return False
914 # total process throttling
showard324bf812009-01-20 23:23:38 +0000915 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000916 return False
917 # if a single agent exceeds the per-cycle throttling, still allow it to
918 # run when it's the first agent in the cycle
919 if num_started_this_cycle == 0:
920 return True
921 # per-cycle throttling
922 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000923 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000924 return False
925 return True
926
927
jadmanski0afbb632008-06-06 21:10:57 +0000928 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000929 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000930 have_reached_limit = False
931 # iterate over copy, so we can remove agents during iteration
932 for agent in list(self._agents):
933 if agent.is_done():
jadmanski0afbb632008-06-06 21:10:57 +0000934 print "agent finished"
showard170873e2009-01-07 00:22:26 +0000935 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000936 continue
937 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000938 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000939 have_reached_limit):
940 have_reached_limit = True
941 continue
showard4c5374f2008-09-04 17:02:56 +0000942 num_started_this_cycle += agent.num_processes
943 agent.tick()
showard324bf812009-01-20 23:23:38 +0000944 print _drone_manager.total_running_processes(), 'running processes'
mbligh36768f02008-02-22 18:28:33 +0000945
946
showardfa8629c2008-11-04 16:51:23 +0000947 def _check_for_db_inconsistencies(self):
948 query = models.HostQueueEntry.objects.filter(active=True, complete=True)
949 if query.count() != 0:
950 subject = ('%d queue entries found with active=complete=1'
951 % query.count())
952 message = '\n'.join(str(entry.get_object_dict())
953 for entry in query[:50])
954 if len(query) > 50:
955 message += '\n(truncated)\n'
956
957 print subject
showard170873e2009-01-07 00:22:26 +0000958 email_manager.manager.enqueue_notify_email(subject, message)
showardfa8629c2008-11-04 16:51:23 +0000959
960
showard170873e2009-01-07 00:22:26 +0000961class PidfileRunMonitor(object):
962 """
963 Client must call either run() to start a new process or
964 attach_to_existing_process().
965 """
mbligh36768f02008-02-22 18:28:33 +0000966
showard170873e2009-01-07 00:22:26 +0000967 class _PidfileException(Exception):
968 """
969 Raised when there's some unexpected behavior with the pid file, but only
970 used internally (never allowed to escape this class).
971 """
mbligh36768f02008-02-22 18:28:33 +0000972
973
showard170873e2009-01-07 00:22:26 +0000974 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000975 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000976 self._start_time = None
977 self.pidfile_id = None
978 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000979
980
showard170873e2009-01-07 00:22:26 +0000981 def _add_nice_command(self, command, nice_level):
982 if not nice_level:
983 return command
984 return ['nice', '-n', str(nice_level)] + command
985
986
987 def _set_start_time(self):
988 self._start_time = time.time()
989
990
991 def run(self, command, working_directory, nice_level=None, log_file=None,
992 pidfile_name=None, paired_with_pidfile=None):
993 assert command is not None
994 if nice_level is not None:
995 command = ['nice', '-n', str(nice_level)] + command
996 self._set_start_time()
997 self.pidfile_id = _drone_manager.execute_command(
998 command, working_directory, log_file=log_file,
999 pidfile_name=pidfile_name, paired_with_pidfile=paired_with_pidfile)
1000
1001
1002 def attach_to_existing_process(self, execution_tag):
1003 self._set_start_time()
1004 self.pidfile_id = _drone_manager.get_pidfile_id_from(execution_tag)
1005 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001006
1007
jadmanski0afbb632008-06-06 21:10:57 +00001008 def kill(self):
showard170873e2009-01-07 00:22:26 +00001009 if self.has_process():
1010 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001011
mbligh36768f02008-02-22 18:28:33 +00001012
showard170873e2009-01-07 00:22:26 +00001013 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001014 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001015 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001016
1017
showard170873e2009-01-07 00:22:26 +00001018 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001019 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001020 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001021 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001022
1023
showard170873e2009-01-07 00:22:26 +00001024 def _read_pidfile(self, use_second_read=False):
1025 assert self.pidfile_id is not None, (
1026 'You must call run() or attach_to_existing_process()')
1027 contents = _drone_manager.get_pidfile_contents(
1028 self.pidfile_id, use_second_read=use_second_read)
1029 if contents.is_invalid():
1030 self._state = drone_manager.PidfileContents()
1031 raise self._PidfileException(contents)
1032 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001033
1034
showard21baa452008-10-21 00:08:39 +00001035 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001036 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1037 self._state.process, self.pidfile_id, message)
showard21baa452008-10-21 00:08:39 +00001038 print message
showard170873e2009-01-07 00:22:26 +00001039 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001040 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001041
1042
1043 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001044 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001045 return
mblighbb421852008-03-11 22:36:16 +00001046
showard21baa452008-10-21 00:08:39 +00001047 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001048
showard170873e2009-01-07 00:22:26 +00001049 if self._state.process is None:
1050 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001051 return
mbligh90a549d2008-03-25 23:52:34 +00001052
showard21baa452008-10-21 00:08:39 +00001053 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001054 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001055 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001056 return
mbligh90a549d2008-03-25 23:52:34 +00001057
showard170873e2009-01-07 00:22:26 +00001058 # pid but no running process - maybe process *just* exited
1059 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001060 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001061 # autoserv exited without writing an exit code
1062 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001063 self._handle_pidfile_error(
1064 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001065
showard21baa452008-10-21 00:08:39 +00001066
1067 def _get_pidfile_info(self):
1068 """\
1069 After completion, self._state will contain:
1070 pid=None, exit_status=None if autoserv has not yet run
1071 pid!=None, exit_status=None if autoserv is running
1072 pid!=None, exit_status!=None if autoserv has completed
1073 """
1074 try:
1075 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001076 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001077 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001078
1079
showard170873e2009-01-07 00:22:26 +00001080 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001081 """\
1082 Called when no pidfile is found or no pid is in the pidfile.
1083 """
showard170873e2009-01-07 00:22:26 +00001084 message = 'No pid found at %s' % self.pidfile_id
jadmanski0afbb632008-06-06 21:10:57 +00001085 print message
showard170873e2009-01-07 00:22:26 +00001086 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1087 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001088 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001089 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001090
1091
showard35162b02009-03-03 02:17:30 +00001092 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001093 """\
1094 Called when autoserv has exited without writing an exit status,
1095 or we've timed out waiting for autoserv to write a pid to the
1096 pidfile. In either case, we just return failure and the caller
1097 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001098
showard170873e2009-01-07 00:22:26 +00001099 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001100 """
1101 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001102 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001103 self._state.exit_status = 1
1104 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001105
1106
jadmanski0afbb632008-06-06 21:10:57 +00001107 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001108 self._get_pidfile_info()
1109 return self._state.exit_status
1110
1111
1112 def num_tests_failed(self):
1113 self._get_pidfile_info()
1114 assert self._state.num_tests_failed is not None
1115 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001116
1117
mbligh36768f02008-02-22 18:28:33 +00001118class Agent(object):
showard170873e2009-01-07 00:22:26 +00001119 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001120 self.active_task = None
1121 self.queue = Queue.Queue(0)
1122 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001123 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001124
showard170873e2009-01-07 00:22:26 +00001125 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1126 for task in tasks)
1127 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1128
jadmanski0afbb632008-06-06 21:10:57 +00001129 for task in tasks:
1130 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001131
1132
showard170873e2009-01-07 00:22:26 +00001133 def _union_ids(self, id_lists):
1134 return set(itertools.chain(*id_lists))
1135
1136
jadmanski0afbb632008-06-06 21:10:57 +00001137 def add_task(self, task):
1138 self.queue.put_nowait(task)
1139 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001140
1141
jadmanski0afbb632008-06-06 21:10:57 +00001142 def tick(self):
showard21baa452008-10-21 00:08:39 +00001143 while not self.is_done():
1144 if self.active_task and not self.active_task.is_done():
1145 self.active_task.poll()
1146 if not self.active_task.is_done():
1147 return
1148 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001149
1150
jadmanski0afbb632008-06-06 21:10:57 +00001151 def _next_task(self):
1152 print "agent picking task"
1153 if self.active_task:
1154 assert self.active_task.is_done()
mbligh36768f02008-02-22 18:28:33 +00001155
jadmanski0afbb632008-06-06 21:10:57 +00001156 if not self.active_task.success:
1157 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001158
jadmanski0afbb632008-06-06 21:10:57 +00001159 self.active_task = None
1160 if not self.is_done():
1161 self.active_task = self.queue.get_nowait()
1162 if self.active_task:
1163 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001164
1165
jadmanski0afbb632008-06-06 21:10:57 +00001166 def on_task_failure(self):
1167 self.queue = Queue.Queue(0)
1168 for task in self.active_task.failure_tasks:
1169 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +00001170
mblighe2586682008-02-29 22:45:46 +00001171
showard4c5374f2008-09-04 17:02:56 +00001172 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001173 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001174
1175
jadmanski0afbb632008-06-06 21:10:57 +00001176 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001177 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001178
1179
jadmanski0afbb632008-06-06 21:10:57 +00001180 def start(self):
1181 assert self.dispatcher
mbligh36768f02008-02-22 18:28:33 +00001182
jadmanski0afbb632008-06-06 21:10:57 +00001183 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001184
jadmanski0afbb632008-06-06 21:10:57 +00001185
mbligh36768f02008-02-22 18:28:33 +00001186class AgentTask(object):
showard170873e2009-01-07 00:22:26 +00001187 def __init__(self, cmd, working_directory=None, failure_tasks=[]):
jadmanski0afbb632008-06-06 21:10:57 +00001188 self.done = False
1189 self.failure_tasks = failure_tasks
1190 self.started = False
1191 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001192 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001193 self.task = None
1194 self.agent = None
1195 self.monitor = None
1196 self.success = None
showard170873e2009-01-07 00:22:26 +00001197 self.queue_entry_ids = []
1198 self.host_ids = []
1199 self.log_file = None
1200
1201
1202 def _set_ids(self, host=None, queue_entries=None):
1203 if queue_entries and queue_entries != [None]:
1204 self.host_ids = [entry.host.id for entry in queue_entries]
1205 self.queue_entry_ids = [entry.id for entry in queue_entries]
1206 else:
1207 assert host
1208 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001209
1210
jadmanski0afbb632008-06-06 21:10:57 +00001211 def poll(self):
jadmanski0afbb632008-06-06 21:10:57 +00001212 if self.monitor:
1213 self.tick(self.monitor.exit_code())
1214 else:
1215 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001216
1217
jadmanski0afbb632008-06-06 21:10:57 +00001218 def tick(self, exit_code):
showard170873e2009-01-07 00:22:26 +00001219 if exit_code is None:
jadmanski0afbb632008-06-06 21:10:57 +00001220 return
jadmanski0afbb632008-06-06 21:10:57 +00001221 if exit_code == 0:
1222 success = True
1223 else:
1224 success = False
mbligh36768f02008-02-22 18:28:33 +00001225
jadmanski0afbb632008-06-06 21:10:57 +00001226 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001227
1228
jadmanski0afbb632008-06-06 21:10:57 +00001229 def is_done(self):
1230 return self.done
mbligh36768f02008-02-22 18:28:33 +00001231
1232
jadmanski0afbb632008-06-06 21:10:57 +00001233 def finished(self, success):
1234 self.done = True
1235 self.success = success
1236 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001237
1238
jadmanski0afbb632008-06-06 21:10:57 +00001239 def prolog(self):
1240 pass
mblighd64e5702008-04-04 21:39:28 +00001241
1242
jadmanski0afbb632008-06-06 21:10:57 +00001243 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001244 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001245
mbligh36768f02008-02-22 18:28:33 +00001246
jadmanski0afbb632008-06-06 21:10:57 +00001247 def cleanup(self):
showard170873e2009-01-07 00:22:26 +00001248 if self.monitor and self.log_file:
1249 _drone_manager.copy_to_results_repository(
1250 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001251
1252
jadmanski0afbb632008-06-06 21:10:57 +00001253 def epilog(self):
1254 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001255
1256
jadmanski0afbb632008-06-06 21:10:57 +00001257 def start(self):
1258 assert self.agent
1259
1260 if not self.started:
1261 self.prolog()
1262 self.run()
1263
1264 self.started = True
1265
1266
1267 def abort(self):
1268 if self.monitor:
1269 self.monitor.kill()
1270 self.done = True
1271 self.cleanup()
1272
1273
showard170873e2009-01-07 00:22:26 +00001274 def set_host_log_file(self, base_name, host):
1275 filename = '%s.%s' % (time.time(), base_name)
1276 self.log_file = os.path.join('hosts', host.hostname, filename)
1277
1278
showardde634ee2009-01-30 01:44:24 +00001279 def _get_consistent_execution_tag(self, queue_entries):
1280 first_execution_tag = queue_entries[0].execution_tag()
1281 for queue_entry in queue_entries[1:]:
1282 assert queue_entry.execution_tag() == first_execution_tag, (
1283 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1284 queue_entry,
1285 first_execution_tag,
1286 queue_entries[0]))
1287 return first_execution_tag
1288
1289
showard678df4f2009-02-04 21:36:39 +00001290 def _copy_and_parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001291 assert len(queue_entries) > 0
1292 assert self.monitor
1293 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001294 results_path = execution_tag + '/'
1295 _drone_manager.copy_to_results_repository(self.monitor.get_process(),
1296 results_path)
showardde634ee2009-01-30 01:44:24 +00001297
1298 reparse_task = FinalReparseTask(queue_entries)
1299 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1300
1301
jadmanski0afbb632008-06-06 21:10:57 +00001302 def run(self):
1303 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001304 self.monitor = PidfileRunMonitor()
1305 self.monitor.run(self.cmd, self._working_directory,
1306 nice_level=AUTOSERV_NICE_LEVEL,
1307 log_file=self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001308
1309
1310class RepairTask(AgentTask):
showarde788ea62008-11-17 21:02:47 +00001311 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001312 """\
showard170873e2009-01-07 00:22:26 +00001313 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001314 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001315 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001316 # normalize the protection name
1317 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001318
jadmanski0afbb632008-06-06 21:10:57 +00001319 self.host = host
showarde788ea62008-11-17 21:02:47 +00001320 self.queue_entry = queue_entry
showard170873e2009-01-07 00:22:26 +00001321 self._set_ids(host=host, queue_entries=[queue_entry])
1322
1323 self.create_temp_resultsdir('.repair')
1324 cmd = [_autoserv_path , '-p', '-R', '-m', host.hostname,
1325 '-r', _drone_manager.absolute_path(self.temp_results_dir),
1326 '--host-protection', protection]
1327 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1328
1329 self._set_ids(host=host, queue_entries=[queue_entry])
1330 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001331
mbligh36768f02008-02-22 18:28:33 +00001332
jadmanski0afbb632008-06-06 21:10:57 +00001333 def prolog(self):
1334 print "repair_task starting"
1335 self.host.set_status('Repairing')
showarde788ea62008-11-17 21:02:47 +00001336 if self.queue_entry:
1337 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +00001338
1339
showardde634ee2009-01-30 01:44:24 +00001340 def _fail_queue_entry(self):
1341 assert self.queue_entry
1342 self.queue_entry.set_execution_subdir()
showard678df4f2009-02-04 21:36:39 +00001343 # copy results logs into the normal place for job results
1344 _drone_manager.copy_results_on_drone(
1345 self.monitor.get_process(),
1346 source_path=self.temp_results_dir + '/',
1347 destination_path=self.queue_entry.execution_tag() + '/')
1348
1349 self._copy_and_parse_results([self.queue_entry])
showardde634ee2009-01-30 01:44:24 +00001350 self.queue_entry.handle_host_failure()
1351
1352
jadmanski0afbb632008-06-06 21:10:57 +00001353 def epilog(self):
1354 super(RepairTask, self).epilog()
1355 if self.success:
1356 self.host.set_status('Ready')
1357 else:
1358 self.host.set_status('Repair Failed')
showarde788ea62008-11-17 21:02:47 +00001359 if self.queue_entry and not self.queue_entry.meta_host:
showardde634ee2009-01-30 01:44:24 +00001360 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001361
1362
showard8fe93b52008-11-18 17:53:22 +00001363class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001364 def epilog(self):
1365 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001366 should_copy_results = (self.queue_entry and not self.success
1367 and not self.queue_entry.meta_host)
1368 if should_copy_results:
1369 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001370 destination = os.path.join(self.queue_entry.execution_tag(),
1371 os.path.basename(self.log_file))
1372 _drone_manager.copy_to_results_repository(
1373 self.monitor.get_process(), self.log_file,
1374 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001375
1376
1377class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001378 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001379 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001380 self.host = host or queue_entry.host
1381 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001382
jadmanski0afbb632008-06-06 21:10:57 +00001383 self.create_temp_resultsdir('.verify')
showard170873e2009-01-07 00:22:26 +00001384 cmd = [_autoserv_path, '-p', '-v', '-m', self.host.hostname, '-r',
1385 _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001386 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001387 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1388 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001389
showard170873e2009-01-07 00:22:26 +00001390 self.set_host_log_file('verify', self.host)
1391 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001392
1393
jadmanski0afbb632008-06-06 21:10:57 +00001394 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001395 super(VerifyTask, self).prolog()
jadmanski0afbb632008-06-06 21:10:57 +00001396 print "starting verify on %s" % (self.host.hostname)
1397 if self.queue_entry:
1398 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001399 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001400
1401
jadmanski0afbb632008-06-06 21:10:57 +00001402 def epilog(self):
1403 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001404
jadmanski0afbb632008-06-06 21:10:57 +00001405 if self.success:
1406 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001407
1408
mbligh36768f02008-02-22 18:28:33 +00001409class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001410 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001411 self.job = job
1412 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001413 super(QueueTask, self).__init__(cmd, self._execution_tag())
1414 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001415
1416
showard170873e2009-01-07 00:22:26 +00001417 def _format_keyval(self, key, value):
1418 return '%s=%s' % (key, value)
mbligh36768f02008-02-22 18:28:33 +00001419
1420
showard73ec0442009-02-07 02:05:20 +00001421 def _keyval_path(self):
1422 return os.path.join(self._execution_tag(), 'keyval')
1423
1424
1425 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1426 keyval_contents = '\n'.join(self._format_keyval(key, value)
1427 for key, value in keyval_dict.iteritems())
1428 # always end with a newline to allow additional keyvals to be written
1429 keyval_contents += '\n'
1430 _drone_manager.attach_file_to_execution(self._execution_tag(),
1431 keyval_contents,
1432 file_path=keyval_path)
1433
1434
1435 def _write_keyvals_before_job(self, keyval_dict):
1436 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1437
1438
1439 def _write_keyval_after_job(self, field, value):
showard170873e2009-01-07 00:22:26 +00001440 assert self.monitor and self.monitor.has_process()
showard170873e2009-01-07 00:22:26 +00001441 _drone_manager.write_lines_to_file(
showard73ec0442009-02-07 02:05:20 +00001442 self._keyval_path(), [self._format_keyval(field, value)],
showard35162b02009-03-03 02:17:30 +00001443 paired_with_process=self.monitor.get_process())
showardd8e548a2008-09-09 03:04:57 +00001444
1445
showard170873e2009-01-07 00:22:26 +00001446 def _write_host_keyvals(self, host):
1447 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1448 host.hostname)
1449 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001450 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1451 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001452
1453
showard170873e2009-01-07 00:22:26 +00001454 def _execution_tag(self):
1455 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001456
1457
jadmanski0afbb632008-06-06 21:10:57 +00001458 def prolog(self):
showard73ec0442009-02-07 02:05:20 +00001459 queued = int(time.mktime(self.job.created_on.timetuple()))
1460 self._write_keyvals_before_job({'job_queued': queued})
jadmanski0afbb632008-06-06 21:10:57 +00001461 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001462 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001463 queue_entry.set_status('Running')
1464 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001465 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001466 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001467 assert len(self.queue_entries) == 1
1468 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001469
1470
showard35162b02009-03-03 02:17:30 +00001471 def _write_lost_process_error_file(self):
1472 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1473 _drone_manager.write_lines_to_file(error_file_path,
1474 [_LOST_PROCESS_ERROR])
1475
1476
showard97aed502008-11-04 02:01:24 +00001477 def _finish_task(self, success):
showard35162b02009-03-03 02:17:30 +00001478 if self.monitor.has_process():
1479 self._write_keyval_after_job("job_finished", int(time.time()))
1480 self._copy_and_parse_results(self.queue_entries)
1481
1482 if self.monitor.lost_process:
1483 self._write_lost_process_error_file()
1484 for queue_entry in self.queue_entries:
1485 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001486
1487
showardcbd74612008-11-19 21:42:02 +00001488 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001489 _drone_manager.write_lines_to_file(
1490 os.path.join(self._execution_tag(), 'status.log'),
1491 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001492 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001493
1494
jadmanskif7fa2cc2008-10-01 14:13:23 +00001495 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001496 if not self.monitor or not self.monitor.has_process():
1497 return
1498
jadmanskif7fa2cc2008-10-01 14:13:23 +00001499 # build up sets of all the aborted_by and aborted_on values
1500 aborted_by, aborted_on = set(), set()
1501 for queue_entry in self.queue_entries:
1502 if queue_entry.aborted_by:
1503 aborted_by.add(queue_entry.aborted_by)
1504 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1505 aborted_on.add(t)
1506
1507 # extract some actual, unique aborted by value and write it out
1508 assert len(aborted_by) <= 1
1509 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001510 aborted_by_value = aborted_by.pop()
1511 aborted_on_value = max(aborted_on)
1512 else:
1513 aborted_by_value = 'autotest_system'
1514 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001515
showarda0382352009-02-11 23:36:43 +00001516 self._write_keyval_after_job("aborted_by", aborted_by_value)
1517 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001518
showardcbd74612008-11-19 21:42:02 +00001519 aborted_on_string = str(datetime.datetime.fromtimestamp(
1520 aborted_on_value))
1521 self._write_status_comment('Job aborted by %s on %s' %
1522 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001523
1524
jadmanski0afbb632008-06-06 21:10:57 +00001525 def abort(self):
1526 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001527 self._log_abort()
showard97aed502008-11-04 02:01:24 +00001528 self._finish_task(False)
jadmanskic2ac77f2008-05-16 21:44:04 +00001529
1530
showard21baa452008-10-21 00:08:39 +00001531 def _reboot_hosts(self):
1532 reboot_after = self.job.reboot_after
1533 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00001534 if reboot_after == models.RebootAfter.ALWAYS:
showard21baa452008-10-21 00:08:39 +00001535 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00001536 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
showard21baa452008-10-21 00:08:39 +00001537 num_tests_failed = self.monitor.num_tests_failed()
1538 do_reboot = (self.success and num_tests_failed == 0)
1539
showard8ebca792008-11-04 21:54:22 +00001540 for queue_entry in self.queue_entries:
1541 if do_reboot:
showard45ae8192008-11-05 19:32:53 +00001542 # don't pass the queue entry to the CleanupTask. if the cleanup
showardfa8629c2008-11-04 16:51:23 +00001543 # fails, the job doesn't care -- it's over.
showard45ae8192008-11-05 19:32:53 +00001544 cleanup_task = CleanupTask(host=queue_entry.get_host())
1545 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
showard8ebca792008-11-04 21:54:22 +00001546 else:
1547 queue_entry.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001548
1549
jadmanski0afbb632008-06-06 21:10:57 +00001550 def epilog(self):
1551 super(QueueTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001552 self._finish_task(self.success)
showard21baa452008-10-21 00:08:39 +00001553 self._reboot_hosts()
mblighbb421852008-03-11 22:36:16 +00001554
showard97aed502008-11-04 02:01:24 +00001555 print "queue_task finished with succes=%s" % self.success
mbligh36768f02008-02-22 18:28:33 +00001556
1557
mblighbb421852008-03-11 22:36:16 +00001558class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001559 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001560 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001561 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001562
1563
jadmanski0afbb632008-06-06 21:10:57 +00001564 def run(self):
1565 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001566
1567
jadmanski0afbb632008-06-06 21:10:57 +00001568 def prolog(self):
1569 # recovering an existing process - don't do prolog
1570 pass
mblighbb421852008-03-11 22:36:16 +00001571
1572
showard8fe93b52008-11-18 17:53:22 +00001573class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001574 def __init__(self, host=None, queue_entry=None):
1575 assert bool(host) ^ bool(queue_entry)
1576 if queue_entry:
1577 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001578 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001579 self.host = host
showard170873e2009-01-07 00:22:26 +00001580
1581 self.create_temp_resultsdir('.cleanup')
1582 self.cmd = [_autoserv_path, '-p', '--cleanup', '-m', host.hostname,
1583 '-r', _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001584 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001585 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1586 failure_tasks=[repair_task])
1587
1588 self._set_ids(host=host, queue_entries=[queue_entry])
1589 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001590
mblighd5c95802008-03-05 00:33:46 +00001591
jadmanski0afbb632008-06-06 21:10:57 +00001592 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001593 super(CleanupTask, self).prolog()
showard45ae8192008-11-05 19:32:53 +00001594 print "starting cleanup task for host: %s" % self.host.hostname
1595 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001596
mblighd5c95802008-03-05 00:33:46 +00001597
showard21baa452008-10-21 00:08:39 +00001598 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001599 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001600 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001601 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001602 self.host.update_field('dirty', 0)
1603
1604
mblighd5c95802008-03-05 00:33:46 +00001605class AbortTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001606 def __init__(self, queue_entry, agents_to_abort):
jadmanski0afbb632008-06-06 21:10:57 +00001607 super(AbortTask, self).__init__('')
showard170873e2009-01-07 00:22:26 +00001608 self.queue_entry = queue_entry
1609 # don't use _set_ids, since we don't want to set the host_ids
1610 self.queue_entry_ids = [queue_entry.id]
1611 self.agents_to_abort = agents_to_abort
mbligh36768f02008-02-22 18:28:33 +00001612
1613
jadmanski0afbb632008-06-06 21:10:57 +00001614 def prolog(self):
1615 print "starting abort on host %s, job %s" % (
1616 self.queue_entry.host_id, self.queue_entry.job_id)
mbligh36768f02008-02-22 18:28:33 +00001617
mblighd64e5702008-04-04 21:39:28 +00001618
jadmanski0afbb632008-06-06 21:10:57 +00001619 def epilog(self):
1620 super(AbortTask, self).epilog()
1621 self.queue_entry.set_status('Aborted')
1622 self.success = True
1623
1624
1625 def run(self):
1626 for agent in self.agents_to_abort:
1627 if (agent.active_task):
1628 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001629
1630
showard97aed502008-11-04 02:01:24 +00001631class FinalReparseTask(AgentTask):
showard97aed502008-11-04 02:01:24 +00001632 _num_running_parses = 0
1633
1634 def __init__(self, queue_entries):
1635 self._queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001636 # don't use _set_ids, since we don't want to set the host_ids
1637 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001638 self._parse_started = False
1639
1640 assert len(queue_entries) > 0
1641 queue_entry = queue_entries[0]
showard97aed502008-11-04 02:01:24 +00001642
showard170873e2009-01-07 00:22:26 +00001643 self._execution_tag = queue_entry.execution_tag()
1644 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1645 self._autoserv_monitor = PidfileRunMonitor()
1646 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1647 self._final_status = self._determine_final_status()
1648
showard97aed502008-11-04 02:01:24 +00001649 if _testing_mode:
1650 self.cmd = 'true'
showard170873e2009-01-07 00:22:26 +00001651 else:
1652 super(FinalReparseTask, self).__init__(
1653 cmd=self._generate_parse_command(),
1654 working_directory=self._execution_tag)
showard97aed502008-11-04 02:01:24 +00001655
showard170873e2009-01-07 00:22:26 +00001656 self.log_file = os.path.join(self._execution_tag, '.parse.log')
showard97aed502008-11-04 02:01:24 +00001657
1658
1659 @classmethod
1660 def _increment_running_parses(cls):
1661 cls._num_running_parses += 1
1662
1663
1664 @classmethod
1665 def _decrement_running_parses(cls):
1666 cls._num_running_parses -= 1
1667
1668
1669 @classmethod
1670 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001671 return (cls._num_running_parses <
1672 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001673
1674
showard170873e2009-01-07 00:22:26 +00001675 def _determine_final_status(self):
1676 # we'll use a PidfileRunMonitor to read the autoserv exit status
1677 if self._autoserv_monitor.exit_code() == 0:
1678 return models.HostQueueEntry.Status.COMPLETED
1679 return models.HostQueueEntry.Status.FAILED
1680
1681
showard97aed502008-11-04 02:01:24 +00001682 def prolog(self):
1683 super(FinalReparseTask, self).prolog()
1684 for queue_entry in self._queue_entries:
1685 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
1686
1687
1688 def epilog(self):
1689 super(FinalReparseTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001690 for queue_entry in self._queue_entries:
showard170873e2009-01-07 00:22:26 +00001691 queue_entry.set_status(self._final_status)
showard97aed502008-11-04 02:01:24 +00001692
1693
showard2bab8f42008-11-12 18:15:22 +00001694 def _generate_parse_command(self):
showard170873e2009-01-07 00:22:26 +00001695 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
1696 self._results_dir]
showard97aed502008-11-04 02:01:24 +00001697
1698
1699 def poll(self):
1700 # override poll to keep trying to start until the parse count goes down
1701 # and we can, at which point we revert to default behavior
1702 if self._parse_started:
1703 super(FinalReparseTask, self).poll()
1704 else:
1705 self._try_starting_parse()
1706
1707
1708 def run(self):
1709 # override run() to not actually run unless we can
1710 self._try_starting_parse()
1711
1712
1713 def _try_starting_parse(self):
1714 if not self._can_run_new_parse():
1715 return
showard170873e2009-01-07 00:22:26 +00001716
showard678df4f2009-02-04 21:36:39 +00001717 # make sure we actually have results to parse
showard35162b02009-03-03 02:17:30 +00001718 # this should never happen in normal operation
showard678df4f2009-02-04 21:36:39 +00001719 if not self._autoserv_monitor.has_process():
1720 email_manager.manager.enqueue_notify_email(
1721 'No results to parse',
1722 'No results to parse at %s' % self._autoserv_monitor.pidfile_id)
1723 self.finished(False)
1724 return
1725
showard97aed502008-11-04 02:01:24 +00001726 # actually run the parse command
showard170873e2009-01-07 00:22:26 +00001727 self.monitor = PidfileRunMonitor()
1728 self.monitor.run(self.cmd, self._working_directory,
1729 log_file=self.log_file,
1730 pidfile_name='.parser_execute',
1731 paired_with_pidfile=self._autoserv_monitor.pidfile_id)
1732
showard97aed502008-11-04 02:01:24 +00001733 self._increment_running_parses()
1734 self._parse_started = True
1735
1736
1737 def finished(self, success):
1738 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001739 if self._parse_started:
1740 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001741
1742
showardc9ae1782009-01-30 01:42:37 +00001743class SetEntryPendingTask(AgentTask):
1744 def __init__(self, queue_entry):
1745 super(SetEntryPendingTask, self).__init__(cmd='')
1746 self._queue_entry = queue_entry
1747 self._set_ids(queue_entries=[queue_entry])
1748
1749
1750 def run(self):
1751 agent = self._queue_entry.on_pending()
1752 if agent:
1753 self.agent.dispatcher.add_agent(agent)
1754 self.finished(True)
1755
1756
showarda3c58572009-03-12 20:36:59 +00001757class DBError(Exception):
1758 """Raised by the DBObject constructor when its select fails."""
1759
1760
mbligh36768f02008-02-22 18:28:33 +00001761class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001762 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001763
1764 # Subclasses MUST override these:
1765 _table_name = ''
1766 _fields = ()
1767
showarda3c58572009-03-12 20:36:59 +00001768 # A mapping from (type, id) to the instance of the object for that
1769 # particular id. This prevents us from creating new Job() and Host()
1770 # instances for every HostQueueEntry object that we instantiate as
1771 # multiple HQEs often share the same Job.
1772 _instances_by_type_and_id = weakref.WeakValueDictionary()
1773 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001774
showarda3c58572009-03-12 20:36:59 +00001775
1776 def __new__(cls, id=None, **kwargs):
1777 """
1778 Look to see if we already have an instance for this particular type
1779 and id. If so, use it instead of creating a duplicate instance.
1780 """
1781 if id is not None:
1782 instance = cls._instances_by_type_and_id.get((cls, id))
1783 if instance:
1784 return instance
1785 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1786
1787
1788 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001789 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001790 assert self._table_name, '_table_name must be defined in your class'
1791 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00001792 if not new_record:
1793 if self._initialized and not always_query:
1794 return # We've already been initialized.
1795 if id is None:
1796 id = row[0]
1797 # Tell future constructors to use us instead of re-querying while
1798 # this instance is still around.
1799 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00001800
showard6ae5ea92009-02-25 00:11:51 +00001801 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00001802
jadmanski0afbb632008-06-06 21:10:57 +00001803 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001804
jadmanski0afbb632008-06-06 21:10:57 +00001805 if row is None:
1806 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1807 rows = _db.execute(sql, (id,))
showarda3c58572009-03-12 20:36:59 +00001808 if not rows:
1809 raise DBError("row not found (table=%s, id=%s)"
1810 % (self.__table, id))
jadmanski0afbb632008-06-06 21:10:57 +00001811 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001812
showarda3c58572009-03-12 20:36:59 +00001813 if self._initialized:
1814 differences = self._compare_fields_in_row(row)
1815 if differences:
1816 print ('initialized %s %s instance requery is updating: %s' %
1817 (type(self), self.id, differences))
showard2bab8f42008-11-12 18:15:22 +00001818 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00001819 self._initialized = True
1820
1821
1822 @classmethod
1823 def _clear_instance_cache(cls):
1824 """Used for testing, clear the internal instance cache."""
1825 cls._instances_by_type_and_id.clear()
1826
1827
1828 def _assert_row_length(self, row):
1829 assert len(row) == len(self._fields), (
1830 "table = %s, row = %s/%d, fields = %s/%d" % (
1831 self.__table, row, len(row), self._fields, len(self._fields)))
1832
1833
1834 def _compare_fields_in_row(self, row):
1835 """
1836 Given a row as returned by a SELECT query, compare it to our existing
1837 in memory fields.
1838
1839 @param row - A sequence of values corresponding to fields named in
1840 The class attribute _fields.
1841
1842 @returns A dictionary listing the differences keyed by field name
1843 containing tuples of (current_value, row_value).
1844 """
1845 self._assert_row_length(row)
1846 differences = {}
1847 for field, row_value in itertools.izip(self._fields, row):
1848 current_value = getattr(self, field)
1849 if current_value != row_value:
1850 differences[field] = (current_value, row_value)
1851 return differences
showard2bab8f42008-11-12 18:15:22 +00001852
1853
1854 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00001855 """
1856 Update our field attributes using a single row returned by SELECT.
1857
1858 @param row - A sequence of values corresponding to fields named in
1859 the class fields list.
1860 """
1861 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00001862
showard2bab8f42008-11-12 18:15:22 +00001863 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00001864 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00001865 setattr(self, field, value)
1866 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00001867
showard2bab8f42008-11-12 18:15:22 +00001868 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00001869
mblighe2586682008-02-29 22:45:46 +00001870
jadmanski0afbb632008-06-06 21:10:57 +00001871 def count(self, where, table = None):
1872 if not table:
1873 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001874
jadmanski0afbb632008-06-06 21:10:57 +00001875 rows = _db.execute("""
1876 SELECT count(*) FROM %s
1877 WHERE %s
1878 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001879
jadmanski0afbb632008-06-06 21:10:57 +00001880 assert len(rows) == 1
1881
1882 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001883
1884
mblighf8c624d2008-07-03 16:58:45 +00001885 def update_field(self, field, value, condition=''):
showard2bab8f42008-11-12 18:15:22 +00001886 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00001887
showard2bab8f42008-11-12 18:15:22 +00001888 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00001889 return
mbligh36768f02008-02-22 18:28:33 +00001890
mblighf8c624d2008-07-03 16:58:45 +00001891 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
1892 if condition:
1893 query += ' AND (%s)' % condition
jadmanski0afbb632008-06-06 21:10:57 +00001894 _db.execute(query, (value, self.id))
1895
showard2bab8f42008-11-12 18:15:22 +00001896 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00001897
1898
jadmanski0afbb632008-06-06 21:10:57 +00001899 def save(self):
1900 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00001901 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00001902 columns = ','.join([str(key) for key in keys])
1903 values = ['"%s"' % self.__dict__[key] for key in keys]
showard89f84db2009-03-12 20:39:13 +00001904 values_str = ','.join(values)
1905 query = ('INSERT INTO %s (%s) VALUES (%s)' %
1906 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00001907 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00001908 # Update our id to the one the database just assigned to us.
1909 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00001910
1911
jadmanski0afbb632008-06-06 21:10:57 +00001912 def delete(self):
showarda3c58572009-03-12 20:36:59 +00001913 self._instances_by_type_and_id.pop((type(self), id), None)
1914 self._initialized = False
1915 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00001916 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1917 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00001918
1919
showard63a34772008-08-18 19:32:50 +00001920 @staticmethod
1921 def _prefix_with(string, prefix):
1922 if string:
1923 string = prefix + string
1924 return string
1925
1926
jadmanski0afbb632008-06-06 21:10:57 +00001927 @classmethod
showard989f25d2008-10-01 11:38:11 +00001928 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00001929 """
1930 Construct instances of our class based on the given database query.
1931
1932 @yields One class instance for each row fetched.
1933 """
showard63a34772008-08-18 19:32:50 +00001934 order_by = cls._prefix_with(order_by, 'ORDER BY ')
1935 where = cls._prefix_with(where, 'WHERE ')
1936 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00001937 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00001938 'joins' : joins,
1939 'where' : where,
1940 'order_by' : order_by})
1941 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00001942 for row in rows:
1943 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00001944
mbligh36768f02008-02-22 18:28:33 +00001945
1946class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001947 _table_name = 'ineligible_host_queues'
1948 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00001949
1950
showard89f84db2009-03-12 20:39:13 +00001951class AtomicGroup(DBObject):
1952 _table_name = 'atomic_groups'
1953 _fields = ('id', 'name', 'description', 'max_number_of_machines')
1954
1955
showard989f25d2008-10-01 11:38:11 +00001956class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001957 _table_name = 'labels'
1958 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00001959 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00001960
1961
mbligh36768f02008-02-22 18:28:33 +00001962class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001963 _table_name = 'hosts'
1964 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
1965 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
1966
1967
jadmanski0afbb632008-06-06 21:10:57 +00001968 def current_task(self):
1969 rows = _db.execute("""
1970 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1971 """, (self.id,))
1972
1973 if len(rows) == 0:
1974 return None
1975 else:
1976 assert len(rows) == 1
1977 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00001978 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00001979
1980
jadmanski0afbb632008-06-06 21:10:57 +00001981 def yield_work(self):
1982 print "%s yielding work" % self.hostname
1983 if self.current_task():
1984 self.current_task().requeue()
1985
showard6ae5ea92009-02-25 00:11:51 +00001986
jadmanski0afbb632008-06-06 21:10:57 +00001987 def set_status(self,status):
1988 print '%s -> %s' % (self.hostname, status)
1989 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00001990
1991
showard170873e2009-01-07 00:22:26 +00001992 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00001993 """
showard170873e2009-01-07 00:22:26 +00001994 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00001995 """
1996 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00001997 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00001998 FROM labels
1999 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002000 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002001 ORDER BY labels.name
2002 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002003 platform = None
2004 all_labels = []
2005 for label_name, is_platform in rows:
2006 if is_platform:
2007 platform = label_name
2008 all_labels.append(label_name)
2009 return platform, all_labels
2010
2011
2012 def reverify_tasks(self):
2013 cleanup_task = CleanupTask(host=self)
2014 verify_task = VerifyTask(host=self)
2015 # just to make sure this host does not get taken away
2016 self.set_status('Cleaning')
2017 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002018
2019
mbligh36768f02008-02-22 18:28:33 +00002020class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002021 _table_name = 'host_queue_entries'
2022 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002023 'active', 'complete', 'deleted', 'execution_subdir',
2024 'atomic_group_id')
showard6ae5ea92009-02-25 00:11:51 +00002025
2026
showarda3c58572009-03-12 20:36:59 +00002027 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002028 assert id or row
showarda3c58572009-03-12 20:36:59 +00002029 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002030 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002031
jadmanski0afbb632008-06-06 21:10:57 +00002032 if self.host_id:
2033 self.host = Host(self.host_id)
2034 else:
2035 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002036
showard170873e2009-01-07 00:22:26 +00002037 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002038 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002039
2040
showard89f84db2009-03-12 20:39:13 +00002041 @classmethod
2042 def clone(cls, template):
2043 """
2044 Creates a new row using the values from a template instance.
2045
2046 The new instance will not exist in the database or have a valid
2047 id attribute until its save() method is called.
2048 """
2049 assert isinstance(template, cls)
2050 new_row = [getattr(template, field) for field in cls._fields]
2051 clone = cls(row=new_row, new_record=True)
2052 clone.id = None
2053 return clone
2054
2055
showardc85c21b2008-11-24 22:17:37 +00002056 def _view_job_url(self):
2057 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2058
2059
jadmanski0afbb632008-06-06 21:10:57 +00002060 def set_host(self, host):
2061 if host:
2062 self.queue_log_record('Assigning host ' + host.hostname)
2063 self.update_field('host_id', host.id)
2064 self.update_field('active', True)
2065 self.block_host(host.id)
2066 else:
2067 self.queue_log_record('Releasing host')
2068 self.unblock_host(self.host.id)
2069 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002070
jadmanski0afbb632008-06-06 21:10:57 +00002071 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002072
2073
jadmanski0afbb632008-06-06 21:10:57 +00002074 def get_host(self):
2075 return self.host
mbligh36768f02008-02-22 18:28:33 +00002076
2077
jadmanski0afbb632008-06-06 21:10:57 +00002078 def queue_log_record(self, log_line):
2079 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002080 _drone_manager.write_lines_to_file(self.queue_log_path,
2081 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002082
2083
jadmanski0afbb632008-06-06 21:10:57 +00002084 def block_host(self, host_id):
2085 print "creating block %s/%s" % (self.job.id, host_id)
2086 row = [0, self.job.id, host_id]
2087 block = IneligibleHostQueue(row=row, new_record=True)
2088 block.save()
mblighe2586682008-02-29 22:45:46 +00002089
2090
jadmanski0afbb632008-06-06 21:10:57 +00002091 def unblock_host(self, host_id):
2092 print "removing block %s/%s" % (self.job.id, host_id)
2093 blocks = IneligibleHostQueue.fetch(
2094 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2095 for block in blocks:
2096 block.delete()
mblighe2586682008-02-29 22:45:46 +00002097
2098
showard2bab8f42008-11-12 18:15:22 +00002099 def set_execution_subdir(self, subdir=None):
2100 if subdir is None:
2101 assert self.get_host()
2102 subdir = self.get_host().hostname
2103 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002104
2105
showard6355f6b2008-12-05 18:52:13 +00002106 def _get_hostname(self):
2107 if self.host:
2108 return self.host.hostname
2109 return 'no host'
2110
2111
showard170873e2009-01-07 00:22:26 +00002112 def __str__(self):
2113 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2114
2115
jadmanski0afbb632008-06-06 21:10:57 +00002116 def set_status(self, status):
mblighf8c624d2008-07-03 16:58:45 +00002117 abort_statuses = ['Abort', 'Aborting', 'Aborted']
2118 if status not in abort_statuses:
2119 condition = ' AND '.join(['status <> "%s"' % x
2120 for x in abort_statuses])
2121 else:
2122 condition = ''
2123 self.update_field('status', status, condition=condition)
2124
showard170873e2009-01-07 00:22:26 +00002125 print "%s -> %s" % (self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002126
showardc85c21b2008-11-24 22:17:37 +00002127 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002128 self.update_field('complete', False)
2129 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002130
jadmanski0afbb632008-06-06 21:10:57 +00002131 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showarde58e3f82008-11-20 19:04:59 +00002132 'Aborting']:
jadmanski0afbb632008-06-06 21:10:57 +00002133 self.update_field('complete', False)
2134 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002135
showardc85c21b2008-11-24 22:17:37 +00002136 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002137 self.update_field('complete', True)
2138 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002139
2140 should_email_status = (status.lower() in _notify_email_statuses or
2141 'all' in _notify_email_statuses)
2142 if should_email_status:
2143 self._email_on_status(status)
2144
2145 self._email_on_job_complete()
2146
2147
2148 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002149 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002150
2151 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2152 self.job.id, self.job.name, hostname, status)
2153 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2154 self.job.id, self.job.name, hostname, status,
2155 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002156 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002157
2158
2159 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002160 if not self.job.is_finished():
2161 return
showard542e8402008-09-19 20:16:18 +00002162
showardc85c21b2008-11-24 22:17:37 +00002163 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002164 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002165 for queue_entry in hosts_queue:
2166 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002167 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002168 queue_entry.status))
2169
2170 summary_text = "\n".join(summary_text)
2171 status_counts = models.Job.objects.get_status_counts(
2172 [self.job.id])[self.job.id]
2173 status = ', '.join('%d %s' % (count, status) for status, count
2174 in status_counts.iteritems())
2175
2176 subject = 'Autotest: Job ID: %s "%s" %s' % (
2177 self.job.id, self.job.name, status)
2178 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2179 self.job.id, self.job.name, status, self._view_job_url(),
2180 summary_text)
showard170873e2009-01-07 00:22:26 +00002181 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002182
2183
showard89f84db2009-03-12 20:39:13 +00002184 def run(self, assigned_host=None):
2185 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002186 assert assigned_host
2187 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002188 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002189
showard89f84db2009-03-12 20:39:13 +00002190 print "%s/%s/%s scheduled on %s, status=%s" % (
2191 self.job.name, self.meta_host, self.atomic_group_id,
2192 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002193
jadmanski0afbb632008-06-06 21:10:57 +00002194 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002195
showard6ae5ea92009-02-25 00:11:51 +00002196
jadmanski0afbb632008-06-06 21:10:57 +00002197 def requeue(self):
2198 self.set_status('Queued')
showardde634ee2009-01-30 01:44:24 +00002199 # verify/cleanup failure sets the execution subdir, so reset it here
2200 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002201 if self.meta_host:
2202 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002203
2204
jadmanski0afbb632008-06-06 21:10:57 +00002205 def handle_host_failure(self):
2206 """\
2207 Called when this queue entry's host has failed verification and
2208 repair.
2209 """
2210 assert not self.meta_host
2211 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002212 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002213
2214
jadmanskif7fa2cc2008-10-01 14:13:23 +00002215 @property
2216 def aborted_by(self):
2217 self._load_abort_info()
2218 return self._aborted_by
2219
2220
2221 @property
2222 def aborted_on(self):
2223 self._load_abort_info()
2224 return self._aborted_on
2225
2226
2227 def _load_abort_info(self):
2228 """ Fetch info about who aborted the job. """
2229 if hasattr(self, "_aborted_by"):
2230 return
2231 rows = _db.execute("""
2232 SELECT users.login, aborted_host_queue_entries.aborted_on
2233 FROM aborted_host_queue_entries
2234 INNER JOIN users
2235 ON users.id = aborted_host_queue_entries.aborted_by_id
2236 WHERE aborted_host_queue_entries.queue_entry_id = %s
2237 """, (self.id,))
2238 if rows:
2239 self._aborted_by, self._aborted_on = rows[0]
2240 else:
2241 self._aborted_by = self._aborted_on = None
2242
2243
showardb2e2c322008-10-14 17:33:55 +00002244 def on_pending(self):
2245 """
2246 Called when an entry in a synchronous job has passed verify. If the
2247 job is ready to run, returns an agent to run the job. Returns None
2248 otherwise.
2249 """
2250 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002251 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002252 if self.job.is_ready():
2253 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002254 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002255 return None
2256
2257
showard170873e2009-01-07 00:22:26 +00002258 def abort(self, dispatcher, agents_to_abort=[]):
showard1be97432008-10-17 15:30:45 +00002259 host = self.get_host()
showard9d9ffd52008-11-09 23:14:35 +00002260 if self.active and host:
showard170873e2009-01-07 00:22:26 +00002261 dispatcher.add_agent(Agent(tasks=host.reverify_tasks()))
showard1be97432008-10-17 15:30:45 +00002262
showard170873e2009-01-07 00:22:26 +00002263 abort_task = AbortTask(self, agents_to_abort)
showard1be97432008-10-17 15:30:45 +00002264 self.set_status('Aborting')
showard170873e2009-01-07 00:22:26 +00002265 dispatcher.add_agent(Agent(tasks=[abort_task], num_processes=0))
2266
2267 def execution_tag(self):
2268 assert self.execution_subdir
2269 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002270
2271
mbligh36768f02008-02-22 18:28:33 +00002272class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002273 _table_name = 'jobs'
2274 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2275 'control_type', 'created_on', 'synch_count', 'timeout',
2276 'run_verify', 'email_list', 'reboot_before', 'reboot_after')
2277
2278
showarda3c58572009-03-12 20:36:59 +00002279 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002280 assert id or row
showarda3c58572009-03-12 20:36:59 +00002281 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002282
mblighe2586682008-02-29 22:45:46 +00002283
jadmanski0afbb632008-06-06 21:10:57 +00002284 def is_server_job(self):
2285 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002286
2287
showard170873e2009-01-07 00:22:26 +00002288 def tag(self):
2289 return "%s-%s" % (self.id, self.owner)
2290
2291
jadmanski0afbb632008-06-06 21:10:57 +00002292 def get_host_queue_entries(self):
2293 rows = _db.execute("""
2294 SELECT * FROM host_queue_entries
2295 WHERE job_id= %s
2296 """, (self.id,))
2297 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002298
jadmanski0afbb632008-06-06 21:10:57 +00002299 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002300
jadmanski0afbb632008-06-06 21:10:57 +00002301 return entries
mbligh36768f02008-02-22 18:28:33 +00002302
2303
jadmanski0afbb632008-06-06 21:10:57 +00002304 def set_status(self, status, update_queues=False):
2305 self.update_field('status',status)
2306
2307 if update_queues:
2308 for queue_entry in self.get_host_queue_entries():
2309 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002310
2311
jadmanski0afbb632008-06-06 21:10:57 +00002312 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002313 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2314 status='Pending')
2315 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002316
2317
jadmanski0afbb632008-06-06 21:10:57 +00002318 def num_machines(self, clause = None):
2319 sql = "job_id=%s" % self.id
2320 if clause:
2321 sql += " AND (%s)" % clause
2322 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002323
2324
jadmanski0afbb632008-06-06 21:10:57 +00002325 def num_queued(self):
2326 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002327
2328
jadmanski0afbb632008-06-06 21:10:57 +00002329 def num_active(self):
2330 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002331
2332
jadmanski0afbb632008-06-06 21:10:57 +00002333 def num_complete(self):
2334 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002335
2336
jadmanski0afbb632008-06-06 21:10:57 +00002337 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002338 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002339
mbligh36768f02008-02-22 18:28:33 +00002340
showard6bb7c292009-01-30 01:44:51 +00002341 def _not_yet_run_entries(self, include_verifying=True):
2342 statuses = [models.HostQueueEntry.Status.QUEUED,
2343 models.HostQueueEntry.Status.PENDING]
2344 if include_verifying:
2345 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2346 return models.HostQueueEntry.objects.filter(job=self.id,
2347 status__in=statuses)
2348
2349
2350 def _stop_all_entries(self):
2351 entries_to_stop = self._not_yet_run_entries(
2352 include_verifying=False)
2353 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002354 assert not child_entry.complete, (
2355 '%s status=%s, active=%s, complete=%s' %
2356 (child_entry.id, child_entry.status, child_entry.active,
2357 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002358 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2359 child_entry.host.status = models.Host.Status.READY
2360 child_entry.host.save()
2361 child_entry.status = models.HostQueueEntry.Status.STOPPED
2362 child_entry.save()
2363
showard2bab8f42008-11-12 18:15:22 +00002364 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002365 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002366 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002367 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002368
2369
jadmanski0afbb632008-06-06 21:10:57 +00002370 def write_to_machines_file(self, queue_entry):
2371 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002372 file_path = os.path.join(self.tag(), '.machines')
2373 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002374
2375
showard2bab8f42008-11-12 18:15:22 +00002376 def _next_group_name(self):
2377 query = models.HostQueueEntry.objects.filter(
2378 job=self.id).values('execution_subdir').distinct()
2379 subdirs = (entry['execution_subdir'] for entry in query)
2380 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2381 ids = [int(match.group(1)) for match in groups if match]
2382 if ids:
2383 next_id = max(ids) + 1
2384 else:
2385 next_id = 0
2386 return "group%d" % next_id
2387
2388
showard170873e2009-01-07 00:22:26 +00002389 def _write_control_file(self, execution_tag):
2390 control_path = _drone_manager.attach_file_to_execution(
2391 execution_tag, self.control_file)
2392 return control_path
mbligh36768f02008-02-22 18:28:33 +00002393
showardb2e2c322008-10-14 17:33:55 +00002394
showard2bab8f42008-11-12 18:15:22 +00002395 def get_group_entries(self, queue_entry_from_group):
2396 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002397 return list(HostQueueEntry.fetch(
2398 where='job_id=%s AND execution_subdir=%s',
2399 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002400
2401
showardb2e2c322008-10-14 17:33:55 +00002402 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002403 assert queue_entries
2404 execution_tag = queue_entries[0].execution_tag()
2405 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002406 hostnames = ','.join([entry.get_host().hostname
2407 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002408
showard170873e2009-01-07 00:22:26 +00002409 params = [_autoserv_path, '-P', execution_tag, '-p', '-n',
2410 '-r', _drone_manager.absolute_path(execution_tag),
2411 '-u', self.owner, '-l', self.name, '-m', hostnames,
2412 _drone_manager.absolute_path(control_path)]
mbligh36768f02008-02-22 18:28:33 +00002413
jadmanski0afbb632008-06-06 21:10:57 +00002414 if not self.is_server_job():
2415 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002416
showardb2e2c322008-10-14 17:33:55 +00002417 return params
mblighe2586682008-02-29 22:45:46 +00002418
mbligh36768f02008-02-22 18:28:33 +00002419
showardc9ae1782009-01-30 01:42:37 +00002420 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002421 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002422 return True
showard0fc38302008-10-23 00:44:07 +00002423 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002424 return queue_entry.get_host().dirty
2425 return False
showard21baa452008-10-21 00:08:39 +00002426
showardc9ae1782009-01-30 01:42:37 +00002427
2428 def _should_run_verify(self, queue_entry):
2429 do_not_verify = (queue_entry.host.protection ==
2430 host_protections.Protection.DO_NOT_VERIFY)
2431 if do_not_verify:
2432 return False
2433 return self.run_verify
2434
2435
2436 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002437 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002438 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002439 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002440 if self._should_run_verify(queue_entry):
2441 tasks.append(VerifyTask(queue_entry=queue_entry))
2442 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002443 return tasks
2444
2445
showard2bab8f42008-11-12 18:15:22 +00002446 def _assign_new_group(self, queue_entries):
2447 if len(queue_entries) == 1:
2448 group_name = queue_entries[0].get_host().hostname
2449 else:
2450 group_name = self._next_group_name()
2451 print 'Running synchronous job %d hosts %s as %s' % (
2452 self.id, [entry.host.hostname for entry in queue_entries],
2453 group_name)
2454
2455 for queue_entry in queue_entries:
2456 queue_entry.set_execution_subdir(group_name)
2457
2458
2459 def _choose_group_to_run(self, include_queue_entry):
2460 chosen_entries = [include_queue_entry]
2461
2462 num_entries_needed = self.synch_count - 1
2463 if num_entries_needed > 0:
2464 pending_entries = HostQueueEntry.fetch(
2465 where='job_id = %s AND status = "Pending" AND id != %s',
2466 params=(self.id, include_queue_entry.id))
2467 chosen_entries += list(pending_entries)[:num_entries_needed]
2468
2469 self._assign_new_group(chosen_entries)
2470 return chosen_entries
2471
2472
2473 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002474 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002475 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2476 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002477
showard2bab8f42008-11-12 18:15:22 +00002478 queue_entries = self._choose_group_to_run(queue_entry)
2479 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002480
2481
2482 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002483 for queue_entry in queue_entries:
2484 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002485 params = self._get_autoserv_params(queue_entries)
2486 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2487 cmd=params)
2488 tasks = initial_tasks + [queue_task]
2489 entry_ids = [entry.id for entry in queue_entries]
2490
showard170873e2009-01-07 00:22:26 +00002491 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002492
2493
mbligh36768f02008-02-22 18:28:33 +00002494if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002495 main()