blob: 236f5c2a3d05f8264354b8936d9bd291515e045a [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard89f84db2009-03-12 20:39:13 +00008import datetime, errno, optparse, os, pwd, Queue, random, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000017from autotest_lib.frontend.afe import models
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
showardd1ee1dd2009-01-07 21:33:08 +000019from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000020
mblighb090f142008-02-27 21:33:46 +000021
mbligh36768f02008-02-22 18:28:33 +000022RESULTS_DIR = '.'
23AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000024DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000025
26AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
27
28if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000029 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000030AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
31AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
32
33if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000034 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000035
mbligh90a549d2008-03-25 23:52:34 +000036# how long to wait for autoserv to write a pidfile
37PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000038
showard35162b02009-03-03 02:17:30 +000039# error message to leave in results dir when an autoserv process disappears
40# mysteriously
41_LOST_PROCESS_ERROR = """\
42Autoserv failed abnormally during execution for this job, probably due to a
43system error on the Autotest server. Full results may not be available. Sorry.
44"""
45
mbligh6f8bab42008-02-29 22:45:14 +000046_db = None
mbligh36768f02008-02-22 18:28:33 +000047_shutdown = False
showard170873e2009-01-07 00:22:26 +000048_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
49_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000050_testing_mode = False
showard542e8402008-09-19 20:16:18 +000051_base_url = None
showardc85c21b2008-11-24 22:17:37 +000052_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000053_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000054
showardb18134f2009-03-20 20:52:18 +000055# load the logging settings
56scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000057if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
58 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000059# Here we export the log name, using the same convention as autoserv's results
60# directory.
mblighc9895aa2009-04-01 18:36:58 +000061if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
62 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
63else:
64 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
65 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
66
showardb18134f2009-03-20 20:52:18 +000067logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
68
mbligh36768f02008-02-22 18:28:33 +000069
70def main():
jadmanski0afbb632008-06-06 21:10:57 +000071 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000072
jadmanski0afbb632008-06-06 21:10:57 +000073 parser = optparse.OptionParser(usage)
74 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
75 action='store_true')
76 parser.add_option('--logfile', help='Set a log file that all stdout ' +
77 'should be redirected to. Stderr will go to this ' +
78 'file + ".err"')
79 parser.add_option('--test', help='Indicate that scheduler is under ' +
80 'test and should use dummy autoserv and no parsing',
81 action='store_true')
82 (options, args) = parser.parse_args()
83 if len(args) != 1:
84 parser.print_usage()
85 return
mbligh36768f02008-02-22 18:28:33 +000086
jadmanski0afbb632008-06-06 21:10:57 +000087 global RESULTS_DIR
88 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +000089
showardcca334f2009-03-12 20:38:34 +000090 # Change the cwd while running to avoid issues incase we were launched from
91 # somewhere odd (such as a random NFS home directory of the person running
92 # sudo to launch us as the appropriate user).
93 os.chdir(RESULTS_DIR)
94
jadmanski0afbb632008-06-06 21:10:57 +000095 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +000096 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
97 "notify_email_statuses",
98 default='')
showardc85c21b2008-11-24 22:17:37 +000099 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000100 _notify_email_statuses = [status for status in
101 re.split(r'[\s,;:]', notify_statuses_list.lower())
102 if status]
showardc85c21b2008-11-24 22:17:37 +0000103
jadmanski0afbb632008-06-06 21:10:57 +0000104 if options.test:
105 global _autoserv_path
106 _autoserv_path = 'autoserv_dummy'
107 global _testing_mode
108 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000109
mbligh37eceaa2008-12-15 22:56:37 +0000110 # AUTOTEST_WEB.base_url is still a supported config option as some people
111 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000112 global _base_url
showard170873e2009-01-07 00:22:26 +0000113 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
114 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000115 if config_base_url:
116 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000117 else:
mbligh37eceaa2008-12-15 22:56:37 +0000118 # For the common case of everything running on a single server you
119 # can just set the hostname in a single place in the config file.
120 server_name = c.get_config_value('SERVER', 'hostname')
121 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000122 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000123 sys.exit(1)
124 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000125
showardc5afc462009-01-13 00:09:39 +0000126 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000127 server.start()
128
jadmanski0afbb632008-06-06 21:10:57 +0000129 try:
showardc5afc462009-01-13 00:09:39 +0000130 init(options.logfile)
131 dispatcher = Dispatcher()
132 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
133
jadmanski0afbb632008-06-06 21:10:57 +0000134 while not _shutdown:
135 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000136 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000137 except:
showard170873e2009-01-07 00:22:26 +0000138 email_manager.manager.log_stacktrace(
139 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000140
showard170873e2009-01-07 00:22:26 +0000141 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000142 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000143 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000144 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000145
146
147def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000148 global _shutdown
149 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000150 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000151
152
153def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000154 if logfile:
155 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000156 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
157 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000158
mblighfb676032009-04-01 18:25:38 +0000159 utils.write_pid("monitor_db")
160
showardb1e51872008-10-07 11:08:18 +0000161 if _testing_mode:
162 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000163 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000164
jadmanski0afbb632008-06-06 21:10:57 +0000165 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
166 global _db
showard170873e2009-01-07 00:22:26 +0000167 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000168 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000169
showardfa8629c2008-11-04 16:51:23 +0000170 # ensure Django connection is in autocommit
171 setup_django_environment.enable_autocommit()
172
showardb18134f2009-03-20 20:52:18 +0000173 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000174 signal.signal(signal.SIGINT, handle_sigint)
175
showardd1ee1dd2009-01-07 21:33:08 +0000176 drones = global_config.global_config.get_config_value(
177 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
178 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000179 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000180 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000181 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
182
showardb18134f2009-03-20 20:52:18 +0000183 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000184
185
186def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000187 out_file = logfile
188 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000189 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000190 out_fd = open(out_file, "a", buffering=0)
191 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000192
jadmanski0afbb632008-06-06 21:10:57 +0000193 os.dup2(out_fd.fileno(), sys.stdout.fileno())
194 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000195
jadmanski0afbb632008-06-06 21:10:57 +0000196 sys.stdout = out_fd
197 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000198
199
mblighd5c95802008-03-05 00:33:46 +0000200def queue_entries_to_abort():
jadmanski0afbb632008-06-06 21:10:57 +0000201 rows = _db.execute("""
202 SELECT * FROM host_queue_entries WHERE status='Abort';
203 """)
showard2bab8f42008-11-12 18:15:22 +0000204
jadmanski0afbb632008-06-06 21:10:57 +0000205 qe = [HostQueueEntry(row=i) for i in rows]
206 return qe
mbligh36768f02008-02-22 18:28:33 +0000207
showard7cf9a9b2008-05-15 21:15:52 +0000208
showard89f84db2009-03-12 20:39:13 +0000209class SchedulerError(Exception):
210 """Raised by HostScheduler when an inconsistent state occurs."""
211
212
showard63a34772008-08-18 19:32:50 +0000213class HostScheduler(object):
214 def _get_ready_hosts(self):
215 # avoid any host with a currently active queue entry against it
216 hosts = Host.fetch(
217 joins='LEFT JOIN host_queue_entries AS active_hqe '
218 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000219 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000220 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000221 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000222 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
223 return dict((host.id, host) for host in hosts)
224
225
226 @staticmethod
227 def _get_sql_id_list(id_list):
228 return ','.join(str(item_id) for item_id in id_list)
229
230
231 @classmethod
showard989f25d2008-10-01 11:38:11 +0000232 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000233 if not id_list:
234 return {}
showard63a34772008-08-18 19:32:50 +0000235 query %= cls._get_sql_id_list(id_list)
236 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000237 return cls._process_many2many_dict(rows, flip)
238
239
240 @staticmethod
241 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000242 result = {}
243 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000244 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000245 if flip:
246 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000247 result.setdefault(left_id, set()).add(right_id)
248 return result
249
250
251 @classmethod
252 def _get_job_acl_groups(cls, job_ids):
253 query = """
showardd9ac4452009-02-07 02:04:37 +0000254 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000255 FROM jobs
256 INNER JOIN users ON users.login = jobs.owner
257 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
258 WHERE jobs.id IN (%s)
259 """
260 return cls._get_many2many_dict(query, job_ids)
261
262
263 @classmethod
264 def _get_job_ineligible_hosts(cls, job_ids):
265 query = """
266 SELECT job_id, host_id
267 FROM ineligible_host_queues
268 WHERE job_id IN (%s)
269 """
270 return cls._get_many2many_dict(query, job_ids)
271
272
273 @classmethod
showard989f25d2008-10-01 11:38:11 +0000274 def _get_job_dependencies(cls, job_ids):
275 query = """
276 SELECT job_id, label_id
277 FROM jobs_dependency_labels
278 WHERE job_id IN (%s)
279 """
280 return cls._get_many2many_dict(query, job_ids)
281
282
283 @classmethod
showard63a34772008-08-18 19:32:50 +0000284 def _get_host_acls(cls, host_ids):
285 query = """
showardd9ac4452009-02-07 02:04:37 +0000286 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000287 FROM acl_groups_hosts
288 WHERE host_id IN (%s)
289 """
290 return cls._get_many2many_dict(query, host_ids)
291
292
293 @classmethod
294 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000295 if not host_ids:
296 return {}, {}
showard63a34772008-08-18 19:32:50 +0000297 query = """
298 SELECT label_id, host_id
299 FROM hosts_labels
300 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000301 """ % cls._get_sql_id_list(host_ids)
302 rows = _db.execute(query)
303 labels_to_hosts = cls._process_many2many_dict(rows)
304 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
305 return labels_to_hosts, hosts_to_labels
306
307
308 @classmethod
309 def _get_labels(cls):
310 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000311
312
313 def refresh(self, pending_queue_entries):
314 self._hosts_available = self._get_ready_hosts()
315
316 relevant_jobs = [queue_entry.job_id
317 for queue_entry in pending_queue_entries]
318 self._job_acls = self._get_job_acl_groups(relevant_jobs)
319 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000320 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000321
322 host_ids = self._hosts_available.keys()
323 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000324 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
325
326 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000327
328
329 def _is_acl_accessible(self, host_id, queue_entry):
330 job_acls = self._job_acls.get(queue_entry.job_id, set())
331 host_acls = self._host_acls.get(host_id, set())
332 return len(host_acls.intersection(job_acls)) > 0
333
334
showard989f25d2008-10-01 11:38:11 +0000335 def _check_job_dependencies(self, job_dependencies, host_labels):
336 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000337 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000338
339
340 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
341 queue_entry):
showardade14e22009-01-26 22:38:32 +0000342 if not queue_entry.meta_host:
343 # bypass only_if_needed labels when a specific host is selected
344 return True
345
showard989f25d2008-10-01 11:38:11 +0000346 for label_id in host_labels:
347 label = self._labels[label_id]
348 if not label.only_if_needed:
349 # we don't care about non-only_if_needed labels
350 continue
351 if queue_entry.meta_host == label_id:
352 # if the label was requested in a metahost it's OK
353 continue
354 if label_id not in job_dependencies:
355 return False
356 return True
357
358
showard89f84db2009-03-12 20:39:13 +0000359 def _check_atomic_group_labels(self, host_labels, queue_entry):
360 """
361 Determine if the given HostQueueEntry's atomic group settings are okay
362 to schedule on a host with the given labels.
363
364 @param host_labels - A list of label ids that the host has.
365 @param queue_entry - The HostQueueEntry being considered for the host.
366
367 @returns True if atomic group settings are okay, False otherwise.
368 """
369 return (self._get_host_atomic_group_id(host_labels) ==
370 queue_entry.atomic_group_id)
371
372
373 def _get_host_atomic_group_id(self, host_labels):
374 """
375 Return the atomic group label id for a host with the given set of
376 labels if any, or None otherwise. Raises an exception if more than
377 one atomic group are found in the set of labels.
378
379 @param host_labels - A list of label ids that the host has.
380
381 @returns The id of the atomic group found on a label in host_labels
382 or None if no atomic group label is found.
383 @raises SchedulerError - If more than one atomic group label is found.
384 """
385 atomic_ids = [self._labels[label_id].atomic_group_id
386 for label_id in host_labels
387 if self._labels[label_id].atomic_group_id is not None]
388 if not atomic_ids:
389 return None
390 if len(atomic_ids) > 1:
391 raise SchedulerError('More than one atomic label on host.')
392 return atomic_ids[0]
393
394
395 def _get_atomic_group_labels(self, atomic_group_id):
396 """
397 Lookup the label ids that an atomic_group is associated with.
398
399 @param atomic_group_id - The id of the AtomicGroup to look up.
400
401 @returns A generator yeilding Label ids for this atomic group.
402 """
403 return (id for id, label in self._labels.iteritems()
404 if label.atomic_group_id == atomic_group_id
405 and not label.invalid)
406
407
408 def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
409 """
410 @param group_hosts - A sequence of Host ids to test for usability
411 and eligibility against the Job associated with queue_entry.
412 @param queue_entry - The HostQueueEntry that these hosts are being
413 tested for eligibility against.
414
415 @returns A subset of group_hosts Host ids that are eligible for the
416 supplied queue_entry.
417 """
418 return set(host_id for host_id in group_hosts
419 if self._is_host_usable(host_id)
420 and self._is_host_eligible_for_job(host_id, queue_entry))
421
422
showard989f25d2008-10-01 11:38:11 +0000423 def _is_host_eligible_for_job(self, host_id, queue_entry):
424 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
425 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000426
showard89f84db2009-03-12 20:39:13 +0000427 return (self._is_acl_accessible(host_id, queue_entry) and
428 self._check_job_dependencies(job_dependencies, host_labels) and
429 self._check_only_if_needed_labels(
430 job_dependencies, host_labels, queue_entry) and
431 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000432
433
showard63a34772008-08-18 19:32:50 +0000434 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000435 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000436 return None
437 return self._hosts_available.pop(queue_entry.host_id, None)
438
439
440 def _is_host_usable(self, host_id):
441 if host_id not in self._hosts_available:
442 # host was already used during this scheduling cycle
443 return False
444 if self._hosts_available[host_id].invalid:
445 # Invalid hosts cannot be used for metahosts. They're included in
446 # the original query because they can be used by non-metahosts.
447 return False
448 return True
449
450
451 def _schedule_metahost(self, queue_entry):
452 label_id = queue_entry.meta_host
453 hosts_in_label = self._label_hosts.get(label_id, set())
454 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
455 set())
456
457 # must iterate over a copy so we can mutate the original while iterating
458 for host_id in list(hosts_in_label):
459 if not self._is_host_usable(host_id):
460 hosts_in_label.remove(host_id)
461 continue
462 if host_id in ineligible_host_ids:
463 continue
showard989f25d2008-10-01 11:38:11 +0000464 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000465 continue
466
showard89f84db2009-03-12 20:39:13 +0000467 # Remove the host from our cached internal state before returning
468 # the host object.
showard63a34772008-08-18 19:32:50 +0000469 hosts_in_label.remove(host_id)
470 return self._hosts_available.pop(host_id)
471 return None
472
473
474 def find_eligible_host(self, queue_entry):
475 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000476 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000477 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000478 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000479 return self._schedule_metahost(queue_entry)
480
481
showard89f84db2009-03-12 20:39:13 +0000482 def find_eligible_atomic_group(self, queue_entry):
483 """
484 Given an atomic group host queue entry, locate an appropriate group
485 of hosts for the associated job to run on.
486
487 The caller is responsible for creating new HQEs for the additional
488 hosts returned in order to run the actual job on them.
489
490 @returns A list of Host instances in a ready state to satisfy this
491 atomic group scheduling. Hosts will all belong to the same
492 atomic group label as specified by the queue_entry.
493 An empty list will be returned if no suitable atomic
494 group could be found.
495
496 TODO(gps): what is responsible for kicking off any attempted repairs on
497 a group of hosts? not this function, but something needs to. We do
498 not communicate that reason for returning [] outside of here...
499 For now, we'll just be unschedulable if enough hosts within one group
500 enter Repair Failed state.
501 """
502 assert queue_entry.atomic_group_id is not None
503 job = queue_entry.job
504 assert job.synch_count and job.synch_count > 0
505 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
506 if job.synch_count > atomic_group.max_number_of_machines:
507 # Such a Job and HostQueueEntry should never be possible to
508 # create using the frontend. Regardless, we can't process it.
509 # Abort it immediately and log an error on the scheduler.
510 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000511 logging.error(
512 'Error: job %d synch_count=%d > requested atomic_group %d '
513 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
514 job.id, job.synch_count, atomic_group.id,
515 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000516 return []
517 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
518 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
519 set())
520
521 # Look in each label associated with atomic_group until we find one with
522 # enough hosts to satisfy the job.
523 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
524 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
525 if queue_entry.meta_host is not None:
526 # If we have a metahost label, only allow its hosts.
527 group_hosts.intersection_update(hosts_in_label)
528 group_hosts -= ineligible_host_ids
529 eligible_hosts_in_group = self._get_eligible_hosts_in_group(
530 group_hosts, queue_entry)
531
532 # Job.synch_count is treated as "minimum synch count" when
533 # scheduling for an atomic group of hosts. The atomic group
534 # number of machines is the maximum to pick out of a single
535 # atomic group label for scheduling at one time.
536 min_hosts = job.synch_count
537 max_hosts = atomic_group.max_number_of_machines
538
539 if len(eligible_hosts_in_group) < min_hosts:
540 # Not enough eligible hosts in this atomic group label.
541 continue
542
543 # Limit ourselves to scheduling the atomic group size.
544 if len(eligible_hosts_in_group) > max_hosts:
545 eligible_hosts_in_group = random.sample(
546 eligible_hosts_in_group, max_hosts)
547
548 # Remove the selected hosts from our cached internal state
549 # of available hosts in order to return the Host objects.
550 host_list = []
551 for host_id in eligible_hosts_in_group:
552 hosts_in_label.discard(host_id)
553 host_list.append(self._hosts_available.pop(host_id))
554 return host_list
555
556 return []
557
558
showard170873e2009-01-07 00:22:26 +0000559class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000560 def __init__(self):
561 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000562 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000563 self._host_scheduler = HostScheduler()
showard170873e2009-01-07 00:22:26 +0000564 self._host_agents = {}
565 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000566
mbligh36768f02008-02-22 18:28:33 +0000567
jadmanski0afbb632008-06-06 21:10:57 +0000568 def do_initial_recovery(self, recover_hosts=True):
569 # always recover processes
570 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000571
jadmanski0afbb632008-06-06 21:10:57 +0000572 if recover_hosts:
573 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000574
575
jadmanski0afbb632008-06-06 21:10:57 +0000576 def tick(self):
showard170873e2009-01-07 00:22:26 +0000577 _drone_manager.refresh()
showarda3ab0d52008-11-03 19:03:47 +0000578 self._run_cleanup_maybe()
jadmanski0afbb632008-06-06 21:10:57 +0000579 self._find_aborting()
580 self._schedule_new_jobs()
581 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000582 _drone_manager.execute_actions()
583 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000584
showard97aed502008-11-04 02:01:24 +0000585
showarda3ab0d52008-11-03 19:03:47 +0000586 def _run_cleanup_maybe(self):
showardd1ee1dd2009-01-07 21:33:08 +0000587 should_cleanup = (self._last_clean_time +
588 scheduler_config.config.clean_interval * 60 <
589 time.time())
590 if should_cleanup:
showardb18134f2009-03-20 20:52:18 +0000591 logging.info('Running cleanup')
showarda3ab0d52008-11-03 19:03:47 +0000592 self._abort_timed_out_jobs()
593 self._abort_jobs_past_synch_start_timeout()
594 self._clear_inactive_blocks()
showardfa8629c2008-11-04 16:51:23 +0000595 self._check_for_db_inconsistencies()
showarda3ab0d52008-11-03 19:03:47 +0000596 self._last_clean_time = time.time()
597
mbligh36768f02008-02-22 18:28:33 +0000598
showard170873e2009-01-07 00:22:26 +0000599 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
600 for object_id in object_ids:
601 agent_dict.setdefault(object_id, set()).add(agent)
602
603
604 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
605 for object_id in object_ids:
606 assert object_id in agent_dict
607 agent_dict[object_id].remove(agent)
608
609
jadmanski0afbb632008-06-06 21:10:57 +0000610 def add_agent(self, agent):
611 self._agents.append(agent)
612 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000613 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
614 self._register_agent_for_ids(self._queue_entry_agents,
615 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000616
showard170873e2009-01-07 00:22:26 +0000617
618 def get_agents_for_entry(self, queue_entry):
619 """
620 Find agents corresponding to the specified queue_entry.
621 """
622 return self._queue_entry_agents.get(queue_entry.id, set())
623
624
625 def host_has_agent(self, host):
626 """
627 Determine if there is currently an Agent present using this host.
628 """
629 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000630
631
jadmanski0afbb632008-06-06 21:10:57 +0000632 def remove_agent(self, agent):
633 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000634 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
635 agent)
636 self._unregister_agent_for_ids(self._queue_entry_agents,
637 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000638
639
showard4c5374f2008-09-04 17:02:56 +0000640 def num_running_processes(self):
641 return sum(agent.num_processes for agent in self._agents
642 if agent.is_running())
mblighbb421852008-03-11 22:36:16 +0000643
644
showard170873e2009-01-07 00:22:26 +0000645 def _extract_execution_tag(self, command_line):
646 match = re.match(r'.* -P (\S+) ', command_line)
647 if not match:
648 return None
649 return match.group(1)
mblighbb421852008-03-11 22:36:16 +0000650
651
showard2bab8f42008-11-12 18:15:22 +0000652 def _recover_queue_entries(self, queue_entries, run_monitor):
653 assert len(queue_entries) > 0
showard2bab8f42008-11-12 18:15:22 +0000654 queue_task = RecoveryQueueTask(job=queue_entries[0].job,
655 queue_entries=queue_entries,
656 run_monitor=run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +0000657 self.add_agent(Agent(tasks=[queue_task],
showard170873e2009-01-07 00:22:26 +0000658 num_processes=len(queue_entries)))
mblighbb421852008-03-11 22:36:16 +0000659
660
jadmanski0afbb632008-06-06 21:10:57 +0000661 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000662 self._register_pidfiles()
663 _drone_manager.refresh()
664 self._recover_running_entries()
665 self._recover_aborting_entries()
666 self._requeue_other_active_entries()
667 self._recover_parsing_entries()
668 self._reverify_remaining_hosts()
669 # reinitialize drones after killing orphaned processes, since they can
670 # leave around files when they die
671 _drone_manager.execute_actions()
672 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000673
showard170873e2009-01-07 00:22:26 +0000674
675 def _register_pidfiles(self):
676 # during recovery we may need to read pidfiles for both running and
677 # parsing entries
678 queue_entries = HostQueueEntry.fetch(
679 where="status IN ('Running', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000680 for queue_entry in queue_entries:
showard170873e2009-01-07 00:22:26 +0000681 pidfile_id = _drone_manager.get_pidfile_id_from(
682 queue_entry.execution_tag())
683 _drone_manager.register_pidfile(pidfile_id)
684
685
686 def _recover_running_entries(self):
687 orphans = _drone_manager.get_orphaned_autoserv_processes()
688
689 queue_entries = HostQueueEntry.fetch(where="status = 'Running'")
690 requeue_entries = []
691 for queue_entry in queue_entries:
692 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000693 # synchronous job we've already recovered
694 continue
showard170873e2009-01-07 00:22:26 +0000695 execution_tag = queue_entry.execution_tag()
696 run_monitor = PidfileRunMonitor()
697 run_monitor.attach_to_existing_process(execution_tag)
698 if not run_monitor.has_process():
699 # autoserv apparently never got run, so let it get requeued
700 continue
showarde788ea62008-11-17 21:02:47 +0000701 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showardb18134f2009-03-20 20:52:18 +0000702 logging.info('Recovering %s (process %s)',
703 (', '.join(str(entry) for entry in queue_entries),
704 run_monitor.get_process()))
showard2bab8f42008-11-12 18:15:22 +0000705 self._recover_queue_entries(queue_entries, run_monitor)
showard170873e2009-01-07 00:22:26 +0000706 orphans.pop(execution_tag, None)
mbligh90a549d2008-03-25 23:52:34 +0000707
jadmanski0afbb632008-06-06 21:10:57 +0000708 # now kill any remaining autoserv processes
showard170873e2009-01-07 00:22:26 +0000709 for process in orphans.itervalues():
showardb18134f2009-03-20 20:52:18 +0000710 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000711 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000712
showard170873e2009-01-07 00:22:26 +0000713
714 def _recover_aborting_entries(self):
715 queue_entries = HostQueueEntry.fetch(
716 where='status IN ("Abort", "Aborting")')
jadmanski0afbb632008-06-06 21:10:57 +0000717 for queue_entry in queue_entries:
showardb18134f2009-03-20 20:52:18 +0000718 logging.info('Recovering aborting QE %s', queue_entry)
showard170873e2009-01-07 00:22:26 +0000719 agent = queue_entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000720
showard97aed502008-11-04 02:01:24 +0000721
showard170873e2009-01-07 00:22:26 +0000722 def _requeue_other_active_entries(self):
723 queue_entries = HostQueueEntry.fetch(
724 where='active AND NOT complete AND status != "Pending"')
725 for queue_entry in queue_entries:
726 if self.get_agents_for_entry(queue_entry):
727 # entry has already been recovered
728 continue
showardb18134f2009-03-20 20:52:18 +0000729 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
730 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000731 if queue_entry.host:
732 tasks = queue_entry.host.reverify_tasks()
733 self.add_agent(Agent(tasks))
734 agent = queue_entry.requeue()
735
736
737 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000738 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000739 self._reverify_hosts_where("""(status = 'Repairing' OR
740 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000741 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000742
showard170873e2009-01-07 00:22:26 +0000743 # recover "Running" hosts with no active queue entries, although this
744 # should never happen
745 message = ('Recovering running host %s - this probably indicates a '
746 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000747 self._reverify_hosts_where("""status = 'Running' AND
748 id NOT IN (SELECT host_id
749 FROM host_queue_entries
750 WHERE active)""",
751 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000752
753
jadmanski0afbb632008-06-06 21:10:57 +0000754 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000755 print_message='Reverifying host %s'):
756 full_where='locked = 0 AND invalid = 0 AND ' + where
757 for host in Host.fetch(where=full_where):
758 if self.host_has_agent(host):
759 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000760 continue
showard170873e2009-01-07 00:22:26 +0000761 if print_message:
showardb18134f2009-03-20 20:52:18 +0000762 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000763 tasks = host.reverify_tasks()
764 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000765
766
showard97aed502008-11-04 02:01:24 +0000767 def _recover_parsing_entries(self):
showard2bab8f42008-11-12 18:15:22 +0000768 recovered_entry_ids = set()
showard97aed502008-11-04 02:01:24 +0000769 for entry in HostQueueEntry.fetch(where='status = "Parsing"'):
showard2bab8f42008-11-12 18:15:22 +0000770 if entry.id in recovered_entry_ids:
771 continue
772 queue_entries = entry.job.get_group_entries(entry)
showard170873e2009-01-07 00:22:26 +0000773 recovered_entry_ids = recovered_entry_ids.union(
774 entry.id for entry in queue_entries)
showardb18134f2009-03-20 20:52:18 +0000775 logging.info('Recovering parsing entries %s',
776 (', '.join(str(entry) for entry in queue_entries)))
showard97aed502008-11-04 02:01:24 +0000777
778 reparse_task = FinalReparseTask(queue_entries)
showard170873e2009-01-07 00:22:26 +0000779 self.add_agent(Agent([reparse_task], num_processes=0))
showard97aed502008-11-04 02:01:24 +0000780
781
jadmanski0afbb632008-06-06 21:10:57 +0000782 def _recover_hosts(self):
783 # recover "Repair Failed" hosts
784 message = 'Reverifying dead host %s'
785 self._reverify_hosts_where("status = 'Repair Failed'",
786 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000787
788
showard3bb499f2008-07-03 19:42:20 +0000789 def _abort_timed_out_jobs(self):
790 """
791 Aborts all jobs that have timed out and not completed
792 """
showarda3ab0d52008-11-03 19:03:47 +0000793 query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
794 where=['created_on + INTERVAL timeout HOUR < NOW()'])
795 for job in query.distinct():
showardb18134f2009-03-20 20:52:18 +0000796 logging.warning('Aborting job %d due to job timeout', job.id)
showarda3ab0d52008-11-03 19:03:47 +0000797 job.abort(None)
showard3bb499f2008-07-03 19:42:20 +0000798
799
showard98863972008-10-29 21:14:56 +0000800 def _abort_jobs_past_synch_start_timeout(self):
801 """
802 Abort synchronous jobs that are past the start timeout (from global
803 config) and are holding a machine that's in everyone.
804 """
805 timeout_delta = datetime.timedelta(
showardd1ee1dd2009-01-07 21:33:08 +0000806 minutes=scheduler_config.config.synch_job_start_timeout_minutes)
showard98863972008-10-29 21:14:56 +0000807 timeout_start = datetime.datetime.now() - timeout_delta
808 query = models.Job.objects.filter(
showard98863972008-10-29 21:14:56 +0000809 created_on__lt=timeout_start,
810 hostqueueentry__status='Pending',
showardd9ac4452009-02-07 02:04:37 +0000811 hostqueueentry__host__aclgroup__name='Everyone')
showard98863972008-10-29 21:14:56 +0000812 for job in query.distinct():
showardb18134f2009-03-20 20:52:18 +0000813 logging.warning('Aborting job %d due to start timeout', job.id)
showardff059d72008-12-03 18:18:53 +0000814 entries_to_abort = job.hostqueueentry_set.exclude(
815 status=models.HostQueueEntry.Status.RUNNING)
816 for queue_entry in entries_to_abort:
817 queue_entry.abort(None)
showard98863972008-10-29 21:14:56 +0000818
819
jadmanski0afbb632008-06-06 21:10:57 +0000820 def _clear_inactive_blocks(self):
821 """
822 Clear out blocks for all completed jobs.
823 """
824 # this would be simpler using NOT IN (subquery), but MySQL
825 # treats all IN subqueries as dependent, so this optimizes much
826 # better
827 _db.execute("""
828 DELETE ihq FROM ineligible_host_queues ihq
showard4eaaf522008-06-06 22:28:07 +0000829 LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries
jadmanski0afbb632008-06-06 21:10:57 +0000830 WHERE NOT complete) hqe
831 USING (job_id) WHERE hqe.job_id IS NULL""")
showard04c82c52008-05-29 19:38:12 +0000832
833
showardb95b1bd2008-08-15 18:11:04 +0000834 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000835 # prioritize by job priority, then non-metahost over metahost, then FIFO
836 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000837 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000838 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000839 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000840
841
showard89f84db2009-03-12 20:39:13 +0000842 def _refresh_pending_queue_entries(self):
843 """
844 Lookup the pending HostQueueEntries and call our HostScheduler
845 refresh() method given that list. Return the list.
846
847 @returns A list of pending HostQueueEntries sorted in priority order.
848 """
showard63a34772008-08-18 19:32:50 +0000849 queue_entries = self._get_pending_queue_entries()
850 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000851 return []
showardb95b1bd2008-08-15 18:11:04 +0000852
showard63a34772008-08-18 19:32:50 +0000853 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000854
showard89f84db2009-03-12 20:39:13 +0000855 return queue_entries
856
857
858 def _schedule_atomic_group(self, queue_entry):
859 """
860 Schedule the given queue_entry on an atomic group of hosts.
861
862 Returns immediately if there are insufficient available hosts.
863
864 Creates new HostQueueEntries based off of queue_entry for the
865 scheduled hosts and starts them all running.
866 """
867 # This is a virtual host queue entry representing an entire
868 # atomic group, find a group and schedule their hosts.
869 group_hosts = self._host_scheduler.find_eligible_atomic_group(
870 queue_entry)
871 if not group_hosts:
872 return
873 # The first assigned host uses the original HostQueueEntry
874 group_queue_entries = [queue_entry]
875 for assigned_host in group_hosts[1:]:
876 # Create a new HQE for every additional assigned_host.
877 new_hqe = HostQueueEntry.clone(queue_entry)
878 new_hqe.save()
879 group_queue_entries.append(new_hqe)
880 assert len(group_queue_entries) == len(group_hosts)
881 for queue_entry, host in itertools.izip(group_queue_entries,
882 group_hosts):
883 self._run_queue_entry(queue_entry, host)
884
885
886 def _schedule_new_jobs(self):
887 queue_entries = self._refresh_pending_queue_entries()
888 if not queue_entries:
889 return
890
showard63a34772008-08-18 19:32:50 +0000891 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000892 if (queue_entry.atomic_group_id is None or
893 queue_entry.host_id is not None):
894 assigned_host = self._host_scheduler.find_eligible_host(
895 queue_entry)
896 if assigned_host:
897 self._run_queue_entry(queue_entry, assigned_host)
898 else:
899 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000900
901
902 def _run_queue_entry(self, queue_entry, host):
903 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000904 # in some cases (synchronous jobs with run_verify=False), agent may be
905 # None
showard9976ce92008-10-15 20:28:13 +0000906 if agent:
907 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000908
909
jadmanski0afbb632008-06-06 21:10:57 +0000910 def _find_aborting(self):
jadmanski0afbb632008-06-06 21:10:57 +0000911 for entry in queue_entries_to_abort():
showard170873e2009-01-07 00:22:26 +0000912 agents_to_abort = list(self.get_agents_for_entry(entry))
showard1be97432008-10-17 15:30:45 +0000913 for agent in agents_to_abort:
914 self.remove_agent(agent)
915
showard170873e2009-01-07 00:22:26 +0000916 entry.abort(self, agents_to_abort)
jadmanski0afbb632008-06-06 21:10:57 +0000917
918
showard324bf812009-01-20 23:23:38 +0000919 def _can_start_agent(self, agent, num_started_this_cycle,
920 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000921 # always allow zero-process agents to run
922 if agent.num_processes == 0:
923 return True
924 # don't allow any nonzero-process agents to run after we've reached a
925 # limit (this avoids starvation of many-process agents)
926 if have_reached_limit:
927 return False
928 # total process throttling
showard324bf812009-01-20 23:23:38 +0000929 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000930 return False
931 # if a single agent exceeds the per-cycle throttling, still allow it to
932 # run when it's the first agent in the cycle
933 if num_started_this_cycle == 0:
934 return True
935 # per-cycle throttling
936 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000937 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000938 return False
939 return True
940
941
jadmanski0afbb632008-06-06 21:10:57 +0000942 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000943 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000944 have_reached_limit = False
945 # iterate over copy, so we can remove agents during iteration
946 for agent in list(self._agents):
947 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000948 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000949 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000950 continue
951 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000952 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000953 have_reached_limit):
954 have_reached_limit = True
955 continue
showard4c5374f2008-09-04 17:02:56 +0000956 num_started_this_cycle += agent.num_processes
957 agent.tick()
showardb18134f2009-03-20 20:52:18 +0000958 logging.info('%d running processes',
959 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000960
961
showardfa8629c2008-11-04 16:51:23 +0000962 def _check_for_db_inconsistencies(self):
963 query = models.HostQueueEntry.objects.filter(active=True, complete=True)
964 if query.count() != 0:
965 subject = ('%d queue entries found with active=complete=1'
966 % query.count())
967 message = '\n'.join(str(entry.get_object_dict())
968 for entry in query[:50])
969 if len(query) > 50:
970 message += '\n(truncated)\n'
971
showardb18134f2009-03-20 20:52:18 +0000972 logging.error(subject)
showard170873e2009-01-07 00:22:26 +0000973 email_manager.manager.enqueue_notify_email(subject, message)
showardfa8629c2008-11-04 16:51:23 +0000974
975
showard170873e2009-01-07 00:22:26 +0000976class PidfileRunMonitor(object):
977 """
978 Client must call either run() to start a new process or
979 attach_to_existing_process().
980 """
mbligh36768f02008-02-22 18:28:33 +0000981
showard170873e2009-01-07 00:22:26 +0000982 class _PidfileException(Exception):
983 """
984 Raised when there's some unexpected behavior with the pid file, but only
985 used internally (never allowed to escape this class).
986 """
mbligh36768f02008-02-22 18:28:33 +0000987
988
showard170873e2009-01-07 00:22:26 +0000989 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000990 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000991 self._start_time = None
992 self.pidfile_id = None
993 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000994
995
showard170873e2009-01-07 00:22:26 +0000996 def _add_nice_command(self, command, nice_level):
997 if not nice_level:
998 return command
999 return ['nice', '-n', str(nice_level)] + command
1000
1001
1002 def _set_start_time(self):
1003 self._start_time = time.time()
1004
1005
1006 def run(self, command, working_directory, nice_level=None, log_file=None,
1007 pidfile_name=None, paired_with_pidfile=None):
1008 assert command is not None
1009 if nice_level is not None:
1010 command = ['nice', '-n', str(nice_level)] + command
1011 self._set_start_time()
1012 self.pidfile_id = _drone_manager.execute_command(
1013 command, working_directory, log_file=log_file,
1014 pidfile_name=pidfile_name, paired_with_pidfile=paired_with_pidfile)
1015
1016
1017 def attach_to_existing_process(self, execution_tag):
1018 self._set_start_time()
1019 self.pidfile_id = _drone_manager.get_pidfile_id_from(execution_tag)
1020 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001021
1022
jadmanski0afbb632008-06-06 21:10:57 +00001023 def kill(self):
showard170873e2009-01-07 00:22:26 +00001024 if self.has_process():
1025 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001026
mbligh36768f02008-02-22 18:28:33 +00001027
showard170873e2009-01-07 00:22:26 +00001028 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001029 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001030 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001031
1032
showard170873e2009-01-07 00:22:26 +00001033 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001034 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001035 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001036 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001037
1038
showard170873e2009-01-07 00:22:26 +00001039 def _read_pidfile(self, use_second_read=False):
1040 assert self.pidfile_id is not None, (
1041 'You must call run() or attach_to_existing_process()')
1042 contents = _drone_manager.get_pidfile_contents(
1043 self.pidfile_id, use_second_read=use_second_read)
1044 if contents.is_invalid():
1045 self._state = drone_manager.PidfileContents()
1046 raise self._PidfileException(contents)
1047 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001048
1049
showard21baa452008-10-21 00:08:39 +00001050 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001051 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1052 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001053 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001054 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001055 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001056
1057
1058 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001059 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001060 return
mblighbb421852008-03-11 22:36:16 +00001061
showard21baa452008-10-21 00:08:39 +00001062 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001063
showard170873e2009-01-07 00:22:26 +00001064 if self._state.process is None:
1065 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001066 return
mbligh90a549d2008-03-25 23:52:34 +00001067
showard21baa452008-10-21 00:08:39 +00001068 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001069 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001070 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001071 return
mbligh90a549d2008-03-25 23:52:34 +00001072
showard170873e2009-01-07 00:22:26 +00001073 # pid but no running process - maybe process *just* exited
1074 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001075 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001076 # autoserv exited without writing an exit code
1077 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001078 self._handle_pidfile_error(
1079 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001080
showard21baa452008-10-21 00:08:39 +00001081
1082 def _get_pidfile_info(self):
1083 """\
1084 After completion, self._state will contain:
1085 pid=None, exit_status=None if autoserv has not yet run
1086 pid!=None, exit_status=None if autoserv is running
1087 pid!=None, exit_status!=None if autoserv has completed
1088 """
1089 try:
1090 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001091 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001092 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001093
1094
showard170873e2009-01-07 00:22:26 +00001095 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001096 """\
1097 Called when no pidfile is found or no pid is in the pidfile.
1098 """
showard170873e2009-01-07 00:22:26 +00001099 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001100 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001101 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1102 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001103 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001104 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001105
1106
showard35162b02009-03-03 02:17:30 +00001107 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001108 """\
1109 Called when autoserv has exited without writing an exit status,
1110 or we've timed out waiting for autoserv to write a pid to the
1111 pidfile. In either case, we just return failure and the caller
1112 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001113
showard170873e2009-01-07 00:22:26 +00001114 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001115 """
1116 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001117 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001118 self._state.exit_status = 1
1119 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001120
1121
jadmanski0afbb632008-06-06 21:10:57 +00001122 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001123 self._get_pidfile_info()
1124 return self._state.exit_status
1125
1126
1127 def num_tests_failed(self):
1128 self._get_pidfile_info()
1129 assert self._state.num_tests_failed is not None
1130 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001131
1132
mbligh36768f02008-02-22 18:28:33 +00001133class Agent(object):
showard170873e2009-01-07 00:22:26 +00001134 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001135 self.active_task = None
1136 self.queue = Queue.Queue(0)
1137 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001138 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001139
showard170873e2009-01-07 00:22:26 +00001140 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1141 for task in tasks)
1142 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1143
jadmanski0afbb632008-06-06 21:10:57 +00001144 for task in tasks:
1145 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001146
1147
showard170873e2009-01-07 00:22:26 +00001148 def _union_ids(self, id_lists):
1149 return set(itertools.chain(*id_lists))
1150
1151
jadmanski0afbb632008-06-06 21:10:57 +00001152 def add_task(self, task):
1153 self.queue.put_nowait(task)
1154 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001155
1156
jadmanski0afbb632008-06-06 21:10:57 +00001157 def tick(self):
showard21baa452008-10-21 00:08:39 +00001158 while not self.is_done():
1159 if self.active_task and not self.active_task.is_done():
1160 self.active_task.poll()
1161 if not self.active_task.is_done():
1162 return
1163 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001164
1165
jadmanski0afbb632008-06-06 21:10:57 +00001166 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001167 logging.info("agent picking task")
jadmanski0afbb632008-06-06 21:10:57 +00001168 if self.active_task:
1169 assert self.active_task.is_done()
mbligh36768f02008-02-22 18:28:33 +00001170
jadmanski0afbb632008-06-06 21:10:57 +00001171 if not self.active_task.success:
1172 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001173
jadmanski0afbb632008-06-06 21:10:57 +00001174 self.active_task = None
1175 if not self.is_done():
1176 self.active_task = self.queue.get_nowait()
1177 if self.active_task:
1178 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001179
1180
jadmanski0afbb632008-06-06 21:10:57 +00001181 def on_task_failure(self):
1182 self.queue = Queue.Queue(0)
showardccbd6c52009-03-21 00:10:21 +00001183 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1184 # get reset.
1185 new_agent = Agent(self.active_task.failure_tasks)
1186 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001187
mblighe2586682008-02-29 22:45:46 +00001188
showard4c5374f2008-09-04 17:02:56 +00001189 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001190 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001191
1192
jadmanski0afbb632008-06-06 21:10:57 +00001193 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001194 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001195
1196
jadmanski0afbb632008-06-06 21:10:57 +00001197 def start(self):
1198 assert self.dispatcher
mbligh36768f02008-02-22 18:28:33 +00001199
jadmanski0afbb632008-06-06 21:10:57 +00001200 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001201
jadmanski0afbb632008-06-06 21:10:57 +00001202
mbligh36768f02008-02-22 18:28:33 +00001203class AgentTask(object):
showard170873e2009-01-07 00:22:26 +00001204 def __init__(self, cmd, working_directory=None, failure_tasks=[]):
jadmanski0afbb632008-06-06 21:10:57 +00001205 self.done = False
1206 self.failure_tasks = failure_tasks
1207 self.started = False
1208 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001209 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001210 self.task = None
1211 self.agent = None
1212 self.monitor = None
1213 self.success = None
showard170873e2009-01-07 00:22:26 +00001214 self.queue_entry_ids = []
1215 self.host_ids = []
1216 self.log_file = None
1217
1218
1219 def _set_ids(self, host=None, queue_entries=None):
1220 if queue_entries and queue_entries != [None]:
1221 self.host_ids = [entry.host.id for entry in queue_entries]
1222 self.queue_entry_ids = [entry.id for entry in queue_entries]
1223 else:
1224 assert host
1225 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001226
1227
jadmanski0afbb632008-06-06 21:10:57 +00001228 def poll(self):
jadmanski0afbb632008-06-06 21:10:57 +00001229 if self.monitor:
1230 self.tick(self.monitor.exit_code())
1231 else:
1232 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001233
1234
jadmanski0afbb632008-06-06 21:10:57 +00001235 def tick(self, exit_code):
showard170873e2009-01-07 00:22:26 +00001236 if exit_code is None:
jadmanski0afbb632008-06-06 21:10:57 +00001237 return
jadmanski0afbb632008-06-06 21:10:57 +00001238 if exit_code == 0:
1239 success = True
1240 else:
1241 success = False
mbligh36768f02008-02-22 18:28:33 +00001242
jadmanski0afbb632008-06-06 21:10:57 +00001243 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001244
1245
jadmanski0afbb632008-06-06 21:10:57 +00001246 def is_done(self):
1247 return self.done
mbligh36768f02008-02-22 18:28:33 +00001248
1249
jadmanski0afbb632008-06-06 21:10:57 +00001250 def finished(self, success):
1251 self.done = True
1252 self.success = success
1253 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001254
1255
jadmanski0afbb632008-06-06 21:10:57 +00001256 def prolog(self):
1257 pass
mblighd64e5702008-04-04 21:39:28 +00001258
1259
jadmanski0afbb632008-06-06 21:10:57 +00001260 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001261 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001262
mbligh36768f02008-02-22 18:28:33 +00001263
jadmanski0afbb632008-06-06 21:10:57 +00001264 def cleanup(self):
showard170873e2009-01-07 00:22:26 +00001265 if self.monitor and self.log_file:
1266 _drone_manager.copy_to_results_repository(
1267 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001268
1269
jadmanski0afbb632008-06-06 21:10:57 +00001270 def epilog(self):
1271 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001272
1273
jadmanski0afbb632008-06-06 21:10:57 +00001274 def start(self):
1275 assert self.agent
1276
1277 if not self.started:
1278 self.prolog()
1279 self.run()
1280
1281 self.started = True
1282
1283
1284 def abort(self):
1285 if self.monitor:
1286 self.monitor.kill()
1287 self.done = True
1288 self.cleanup()
1289
1290
showard170873e2009-01-07 00:22:26 +00001291 def set_host_log_file(self, base_name, host):
1292 filename = '%s.%s' % (time.time(), base_name)
1293 self.log_file = os.path.join('hosts', host.hostname, filename)
1294
1295
showardde634ee2009-01-30 01:44:24 +00001296 def _get_consistent_execution_tag(self, queue_entries):
1297 first_execution_tag = queue_entries[0].execution_tag()
1298 for queue_entry in queue_entries[1:]:
1299 assert queue_entry.execution_tag() == first_execution_tag, (
1300 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1301 queue_entry,
1302 first_execution_tag,
1303 queue_entries[0]))
1304 return first_execution_tag
1305
1306
showard678df4f2009-02-04 21:36:39 +00001307 def _copy_and_parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001308 assert len(queue_entries) > 0
1309 assert self.monitor
1310 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001311 results_path = execution_tag + '/'
1312 _drone_manager.copy_to_results_repository(self.monitor.get_process(),
1313 results_path)
showardde634ee2009-01-30 01:44:24 +00001314
1315 reparse_task = FinalReparseTask(queue_entries)
1316 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1317
1318
jadmanski0afbb632008-06-06 21:10:57 +00001319 def run(self):
1320 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001321 self.monitor = PidfileRunMonitor()
1322 self.monitor.run(self.cmd, self._working_directory,
1323 nice_level=AUTOSERV_NICE_LEVEL,
1324 log_file=self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001325
1326
1327class RepairTask(AgentTask):
showarde788ea62008-11-17 21:02:47 +00001328 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001329 """\
showard170873e2009-01-07 00:22:26 +00001330 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001331 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001332 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001333 # normalize the protection name
1334 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001335
jadmanski0afbb632008-06-06 21:10:57 +00001336 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001337 self.queue_entry_to_fail = queue_entry
1338 # *don't* include the queue entry in IDs -- if the queue entry is
1339 # aborted, we want to leave the repair task running
1340 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001341
1342 self.create_temp_resultsdir('.repair')
1343 cmd = [_autoserv_path , '-p', '-R', '-m', host.hostname,
1344 '-r', _drone_manager.absolute_path(self.temp_results_dir),
1345 '--host-protection', protection]
1346 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1347
showard170873e2009-01-07 00:22:26 +00001348 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001349
mbligh36768f02008-02-22 18:28:33 +00001350
jadmanski0afbb632008-06-06 21:10:57 +00001351 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001352 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001353 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001354 if self.queue_entry_to_fail:
1355 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001356
1357
showardde634ee2009-01-30 01:44:24 +00001358 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001359 assert self.queue_entry_to_fail
1360
1361 if self.queue_entry_to_fail.meta_host:
1362 return # don't fail metahost entries, they'll be reassigned
1363
1364 self.queue_entry_to_fail.update_from_database()
1365 if self.queue_entry_to_fail.status != 'Queued':
1366 return # entry has been aborted
1367
1368 self.queue_entry_to_fail.set_execution_subdir()
showard678df4f2009-02-04 21:36:39 +00001369 # copy results logs into the normal place for job results
1370 _drone_manager.copy_results_on_drone(
1371 self.monitor.get_process(),
1372 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001373 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001374
showardccbd6c52009-03-21 00:10:21 +00001375 self._copy_and_parse_results([self.queue_entry_to_fail])
1376 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001377
1378
jadmanski0afbb632008-06-06 21:10:57 +00001379 def epilog(self):
1380 super(RepairTask, self).epilog()
1381 if self.success:
1382 self.host.set_status('Ready')
1383 else:
1384 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001385 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001386 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001387
1388
showard8fe93b52008-11-18 17:53:22 +00001389class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001390 def epilog(self):
1391 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001392 should_copy_results = (self.queue_entry and not self.success
1393 and not self.queue_entry.meta_host)
1394 if should_copy_results:
1395 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001396 destination = os.path.join(self.queue_entry.execution_tag(),
1397 os.path.basename(self.log_file))
1398 _drone_manager.copy_to_results_repository(
1399 self.monitor.get_process(), self.log_file,
1400 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001401
1402
1403class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001404 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001405 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001406 self.host = host or queue_entry.host
1407 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001408
jadmanski0afbb632008-06-06 21:10:57 +00001409 self.create_temp_resultsdir('.verify')
showard170873e2009-01-07 00:22:26 +00001410 cmd = [_autoserv_path, '-p', '-v', '-m', self.host.hostname, '-r',
1411 _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001412 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001413 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1414 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001415
showard170873e2009-01-07 00:22:26 +00001416 self.set_host_log_file('verify', self.host)
1417 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001418
1419
jadmanski0afbb632008-06-06 21:10:57 +00001420 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001421 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001422 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001423 if self.queue_entry:
1424 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001425 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001426
1427
jadmanski0afbb632008-06-06 21:10:57 +00001428 def epilog(self):
1429 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001430
jadmanski0afbb632008-06-06 21:10:57 +00001431 if self.success:
1432 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001433
1434
mbligh36768f02008-02-22 18:28:33 +00001435class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001436 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001437 self.job = job
1438 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001439 super(QueueTask, self).__init__(cmd, self._execution_tag())
1440 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001441
1442
showard170873e2009-01-07 00:22:26 +00001443 def _format_keyval(self, key, value):
1444 return '%s=%s' % (key, value)
mbligh36768f02008-02-22 18:28:33 +00001445
1446
showard73ec0442009-02-07 02:05:20 +00001447 def _keyval_path(self):
1448 return os.path.join(self._execution_tag(), 'keyval')
1449
1450
1451 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1452 keyval_contents = '\n'.join(self._format_keyval(key, value)
1453 for key, value in keyval_dict.iteritems())
1454 # always end with a newline to allow additional keyvals to be written
1455 keyval_contents += '\n'
1456 _drone_manager.attach_file_to_execution(self._execution_tag(),
1457 keyval_contents,
1458 file_path=keyval_path)
1459
1460
1461 def _write_keyvals_before_job(self, keyval_dict):
1462 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1463
1464
1465 def _write_keyval_after_job(self, field, value):
showard170873e2009-01-07 00:22:26 +00001466 assert self.monitor and self.monitor.has_process()
showard170873e2009-01-07 00:22:26 +00001467 _drone_manager.write_lines_to_file(
showard73ec0442009-02-07 02:05:20 +00001468 self._keyval_path(), [self._format_keyval(field, value)],
showard35162b02009-03-03 02:17:30 +00001469 paired_with_process=self.monitor.get_process())
showardd8e548a2008-09-09 03:04:57 +00001470
1471
showard170873e2009-01-07 00:22:26 +00001472 def _write_host_keyvals(self, host):
1473 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1474 host.hostname)
1475 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001476 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1477 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001478
1479
showard170873e2009-01-07 00:22:26 +00001480 def _execution_tag(self):
1481 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001482
1483
jadmanski0afbb632008-06-06 21:10:57 +00001484 def prolog(self):
showard73ec0442009-02-07 02:05:20 +00001485 queued = int(time.mktime(self.job.created_on.timetuple()))
1486 self._write_keyvals_before_job({'job_queued': queued})
jadmanski0afbb632008-06-06 21:10:57 +00001487 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001488 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001489 queue_entry.set_status('Running')
1490 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001491 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001492 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001493 assert len(self.queue_entries) == 1
1494 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001495
1496
showard35162b02009-03-03 02:17:30 +00001497 def _write_lost_process_error_file(self):
1498 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1499 _drone_manager.write_lines_to_file(error_file_path,
1500 [_LOST_PROCESS_ERROR])
1501
1502
showard97aed502008-11-04 02:01:24 +00001503 def _finish_task(self, success):
showard35162b02009-03-03 02:17:30 +00001504 if self.monitor.has_process():
1505 self._write_keyval_after_job("job_finished", int(time.time()))
1506 self._copy_and_parse_results(self.queue_entries)
1507
1508 if self.monitor.lost_process:
1509 self._write_lost_process_error_file()
1510 for queue_entry in self.queue_entries:
1511 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001512
1513
showardcbd74612008-11-19 21:42:02 +00001514 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001515 _drone_manager.write_lines_to_file(
1516 os.path.join(self._execution_tag(), 'status.log'),
1517 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001518 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001519
1520
jadmanskif7fa2cc2008-10-01 14:13:23 +00001521 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001522 if not self.monitor or not self.monitor.has_process():
1523 return
1524
jadmanskif7fa2cc2008-10-01 14:13:23 +00001525 # build up sets of all the aborted_by and aborted_on values
1526 aborted_by, aborted_on = set(), set()
1527 for queue_entry in self.queue_entries:
1528 if queue_entry.aborted_by:
1529 aborted_by.add(queue_entry.aborted_by)
1530 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1531 aborted_on.add(t)
1532
1533 # extract some actual, unique aborted by value and write it out
1534 assert len(aborted_by) <= 1
1535 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001536 aborted_by_value = aborted_by.pop()
1537 aborted_on_value = max(aborted_on)
1538 else:
1539 aborted_by_value = 'autotest_system'
1540 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001541
showarda0382352009-02-11 23:36:43 +00001542 self._write_keyval_after_job("aborted_by", aborted_by_value)
1543 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001544
showardcbd74612008-11-19 21:42:02 +00001545 aborted_on_string = str(datetime.datetime.fromtimestamp(
1546 aborted_on_value))
1547 self._write_status_comment('Job aborted by %s on %s' %
1548 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001549
1550
jadmanski0afbb632008-06-06 21:10:57 +00001551 def abort(self):
1552 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001553 self._log_abort()
showard97aed502008-11-04 02:01:24 +00001554 self._finish_task(False)
jadmanskic2ac77f2008-05-16 21:44:04 +00001555
1556
showard21baa452008-10-21 00:08:39 +00001557 def _reboot_hosts(self):
1558 reboot_after = self.job.reboot_after
1559 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00001560 if reboot_after == models.RebootAfter.ALWAYS:
showard21baa452008-10-21 00:08:39 +00001561 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00001562 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
showard21baa452008-10-21 00:08:39 +00001563 num_tests_failed = self.monitor.num_tests_failed()
1564 do_reboot = (self.success and num_tests_failed == 0)
1565
showard8ebca792008-11-04 21:54:22 +00001566 for queue_entry in self.queue_entries:
1567 if do_reboot:
showard45ae8192008-11-05 19:32:53 +00001568 # don't pass the queue entry to the CleanupTask. if the cleanup
showardfa8629c2008-11-04 16:51:23 +00001569 # fails, the job doesn't care -- it's over.
showard45ae8192008-11-05 19:32:53 +00001570 cleanup_task = CleanupTask(host=queue_entry.get_host())
1571 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
showard8ebca792008-11-04 21:54:22 +00001572 else:
1573 queue_entry.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001574
1575
jadmanski0afbb632008-06-06 21:10:57 +00001576 def epilog(self):
1577 super(QueueTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001578 self._finish_task(self.success)
showard21baa452008-10-21 00:08:39 +00001579 self._reboot_hosts()
mblighbb421852008-03-11 22:36:16 +00001580
showardb18134f2009-03-20 20:52:18 +00001581 logging.info("queue_task finished with succes=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001582
1583
mblighbb421852008-03-11 22:36:16 +00001584class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001585 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001586 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001587 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001588
1589
jadmanski0afbb632008-06-06 21:10:57 +00001590 def run(self):
1591 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001592
1593
jadmanski0afbb632008-06-06 21:10:57 +00001594 def prolog(self):
1595 # recovering an existing process - don't do prolog
1596 pass
mblighbb421852008-03-11 22:36:16 +00001597
1598
showard8fe93b52008-11-18 17:53:22 +00001599class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001600 def __init__(self, host=None, queue_entry=None):
1601 assert bool(host) ^ bool(queue_entry)
1602 if queue_entry:
1603 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001604 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001605 self.host = host
showard170873e2009-01-07 00:22:26 +00001606
1607 self.create_temp_resultsdir('.cleanup')
1608 self.cmd = [_autoserv_path, '-p', '--cleanup', '-m', host.hostname,
1609 '-r', _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001610 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001611 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1612 failure_tasks=[repair_task])
1613
1614 self._set_ids(host=host, queue_entries=[queue_entry])
1615 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001616
mblighd5c95802008-03-05 00:33:46 +00001617
jadmanski0afbb632008-06-06 21:10:57 +00001618 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001619 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001620 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001621 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001622
mblighd5c95802008-03-05 00:33:46 +00001623
showard21baa452008-10-21 00:08:39 +00001624 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001625 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001626 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001627 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001628 self.host.update_field('dirty', 0)
1629
1630
mblighd5c95802008-03-05 00:33:46 +00001631class AbortTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001632 def __init__(self, queue_entry, agents_to_abort):
jadmanski0afbb632008-06-06 21:10:57 +00001633 super(AbortTask, self).__init__('')
showard170873e2009-01-07 00:22:26 +00001634 self.queue_entry = queue_entry
1635 # don't use _set_ids, since we don't want to set the host_ids
1636 self.queue_entry_ids = [queue_entry.id]
1637 self.agents_to_abort = agents_to_abort
mbligh36768f02008-02-22 18:28:33 +00001638
1639
jadmanski0afbb632008-06-06 21:10:57 +00001640 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001641 logging.info("starting abort on host %s, job %s",
1642 self.queue_entry.host_id, self.queue_entry.job_id)
mbligh36768f02008-02-22 18:28:33 +00001643
mblighd64e5702008-04-04 21:39:28 +00001644
jadmanski0afbb632008-06-06 21:10:57 +00001645 def epilog(self):
1646 super(AbortTask, self).epilog()
1647 self.queue_entry.set_status('Aborted')
1648 self.success = True
1649
1650
1651 def run(self):
1652 for agent in self.agents_to_abort:
1653 if (agent.active_task):
1654 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001655
1656
showard97aed502008-11-04 02:01:24 +00001657class FinalReparseTask(AgentTask):
showard97aed502008-11-04 02:01:24 +00001658 _num_running_parses = 0
1659
1660 def __init__(self, queue_entries):
1661 self._queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001662 # don't use _set_ids, since we don't want to set the host_ids
1663 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001664 self._parse_started = False
1665
1666 assert len(queue_entries) > 0
1667 queue_entry = queue_entries[0]
showard97aed502008-11-04 02:01:24 +00001668
showard170873e2009-01-07 00:22:26 +00001669 self._execution_tag = queue_entry.execution_tag()
1670 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1671 self._autoserv_monitor = PidfileRunMonitor()
1672 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1673 self._final_status = self._determine_final_status()
1674
showard97aed502008-11-04 02:01:24 +00001675 if _testing_mode:
1676 self.cmd = 'true'
showard170873e2009-01-07 00:22:26 +00001677 else:
1678 super(FinalReparseTask, self).__init__(
1679 cmd=self._generate_parse_command(),
1680 working_directory=self._execution_tag)
showard97aed502008-11-04 02:01:24 +00001681
showard170873e2009-01-07 00:22:26 +00001682 self.log_file = os.path.join(self._execution_tag, '.parse.log')
showard97aed502008-11-04 02:01:24 +00001683
1684
1685 @classmethod
1686 def _increment_running_parses(cls):
1687 cls._num_running_parses += 1
1688
1689
1690 @classmethod
1691 def _decrement_running_parses(cls):
1692 cls._num_running_parses -= 1
1693
1694
1695 @classmethod
1696 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001697 return (cls._num_running_parses <
1698 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001699
1700
showard170873e2009-01-07 00:22:26 +00001701 def _determine_final_status(self):
1702 # we'll use a PidfileRunMonitor to read the autoserv exit status
1703 if self._autoserv_monitor.exit_code() == 0:
1704 return models.HostQueueEntry.Status.COMPLETED
1705 return models.HostQueueEntry.Status.FAILED
1706
1707
showard97aed502008-11-04 02:01:24 +00001708 def prolog(self):
1709 super(FinalReparseTask, self).prolog()
1710 for queue_entry in self._queue_entries:
1711 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
1712
1713
1714 def epilog(self):
1715 super(FinalReparseTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001716 for queue_entry in self._queue_entries:
showard170873e2009-01-07 00:22:26 +00001717 queue_entry.set_status(self._final_status)
showard97aed502008-11-04 02:01:24 +00001718
1719
showard2bab8f42008-11-12 18:15:22 +00001720 def _generate_parse_command(self):
showard170873e2009-01-07 00:22:26 +00001721 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
1722 self._results_dir]
showard97aed502008-11-04 02:01:24 +00001723
1724
1725 def poll(self):
1726 # override poll to keep trying to start until the parse count goes down
1727 # and we can, at which point we revert to default behavior
1728 if self._parse_started:
1729 super(FinalReparseTask, self).poll()
1730 else:
1731 self._try_starting_parse()
1732
1733
1734 def run(self):
1735 # override run() to not actually run unless we can
1736 self._try_starting_parse()
1737
1738
1739 def _try_starting_parse(self):
1740 if not self._can_run_new_parse():
1741 return
showard170873e2009-01-07 00:22:26 +00001742
showard678df4f2009-02-04 21:36:39 +00001743 # make sure we actually have results to parse
showard35162b02009-03-03 02:17:30 +00001744 # this should never happen in normal operation
showard678df4f2009-02-04 21:36:39 +00001745 if not self._autoserv_monitor.has_process():
1746 email_manager.manager.enqueue_notify_email(
1747 'No results to parse',
1748 'No results to parse at %s' % self._autoserv_monitor.pidfile_id)
1749 self.finished(False)
1750 return
1751
showard97aed502008-11-04 02:01:24 +00001752 # actually run the parse command
showard170873e2009-01-07 00:22:26 +00001753 self.monitor = PidfileRunMonitor()
1754 self.monitor.run(self.cmd, self._working_directory,
1755 log_file=self.log_file,
1756 pidfile_name='.parser_execute',
1757 paired_with_pidfile=self._autoserv_monitor.pidfile_id)
1758
showard97aed502008-11-04 02:01:24 +00001759 self._increment_running_parses()
1760 self._parse_started = True
1761
1762
1763 def finished(self, success):
1764 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001765 if self._parse_started:
1766 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001767
1768
showardc9ae1782009-01-30 01:42:37 +00001769class SetEntryPendingTask(AgentTask):
1770 def __init__(self, queue_entry):
1771 super(SetEntryPendingTask, self).__init__(cmd='')
1772 self._queue_entry = queue_entry
1773 self._set_ids(queue_entries=[queue_entry])
1774
1775
1776 def run(self):
1777 agent = self._queue_entry.on_pending()
1778 if agent:
1779 self.agent.dispatcher.add_agent(agent)
1780 self.finished(True)
1781
1782
showarda3c58572009-03-12 20:36:59 +00001783class DBError(Exception):
1784 """Raised by the DBObject constructor when its select fails."""
1785
1786
mbligh36768f02008-02-22 18:28:33 +00001787class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001788 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001789
1790 # Subclasses MUST override these:
1791 _table_name = ''
1792 _fields = ()
1793
showarda3c58572009-03-12 20:36:59 +00001794 # A mapping from (type, id) to the instance of the object for that
1795 # particular id. This prevents us from creating new Job() and Host()
1796 # instances for every HostQueueEntry object that we instantiate as
1797 # multiple HQEs often share the same Job.
1798 _instances_by_type_and_id = weakref.WeakValueDictionary()
1799 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001800
showarda3c58572009-03-12 20:36:59 +00001801
1802 def __new__(cls, id=None, **kwargs):
1803 """
1804 Look to see if we already have an instance for this particular type
1805 and id. If so, use it instead of creating a duplicate instance.
1806 """
1807 if id is not None:
1808 instance = cls._instances_by_type_and_id.get((cls, id))
1809 if instance:
1810 return instance
1811 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1812
1813
1814 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001815 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001816 assert self._table_name, '_table_name must be defined in your class'
1817 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00001818 if not new_record:
1819 if self._initialized and not always_query:
1820 return # We've already been initialized.
1821 if id is None:
1822 id = row[0]
1823 # Tell future constructors to use us instead of re-querying while
1824 # this instance is still around.
1825 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00001826
showard6ae5ea92009-02-25 00:11:51 +00001827 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00001828
jadmanski0afbb632008-06-06 21:10:57 +00001829 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001830
jadmanski0afbb632008-06-06 21:10:57 +00001831 if row is None:
showardccbd6c52009-03-21 00:10:21 +00001832 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00001833
showarda3c58572009-03-12 20:36:59 +00001834 if self._initialized:
1835 differences = self._compare_fields_in_row(row)
1836 if differences:
showard7629f142009-03-27 21:02:02 +00001837 logging.warn(
1838 'initialized %s %s instance requery is updating: %s',
1839 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00001840 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00001841 self._initialized = True
1842
1843
1844 @classmethod
1845 def _clear_instance_cache(cls):
1846 """Used for testing, clear the internal instance cache."""
1847 cls._instances_by_type_and_id.clear()
1848
1849
showardccbd6c52009-03-21 00:10:21 +00001850 def _fetch_row_from_db(self, row_id):
1851 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1852 rows = _db.execute(sql, (row_id,))
1853 if not rows:
1854 raise DBError("row not found (table=%s, id=%s)"
1855 % (self.__table, id))
1856 return rows[0]
1857
1858
showarda3c58572009-03-12 20:36:59 +00001859 def _assert_row_length(self, row):
1860 assert len(row) == len(self._fields), (
1861 "table = %s, row = %s/%d, fields = %s/%d" % (
1862 self.__table, row, len(row), self._fields, len(self._fields)))
1863
1864
1865 def _compare_fields_in_row(self, row):
1866 """
1867 Given a row as returned by a SELECT query, compare it to our existing
1868 in memory fields.
1869
1870 @param row - A sequence of values corresponding to fields named in
1871 The class attribute _fields.
1872
1873 @returns A dictionary listing the differences keyed by field name
1874 containing tuples of (current_value, row_value).
1875 """
1876 self._assert_row_length(row)
1877 differences = {}
1878 for field, row_value in itertools.izip(self._fields, row):
1879 current_value = getattr(self, field)
1880 if current_value != row_value:
1881 differences[field] = (current_value, row_value)
1882 return differences
showard2bab8f42008-11-12 18:15:22 +00001883
1884
1885 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00001886 """
1887 Update our field attributes using a single row returned by SELECT.
1888
1889 @param row - A sequence of values corresponding to fields named in
1890 the class fields list.
1891 """
1892 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00001893
showard2bab8f42008-11-12 18:15:22 +00001894 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00001895 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00001896 setattr(self, field, value)
1897 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00001898
showard2bab8f42008-11-12 18:15:22 +00001899 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00001900
mblighe2586682008-02-29 22:45:46 +00001901
showardccbd6c52009-03-21 00:10:21 +00001902 def update_from_database(self):
1903 assert self.id is not None
1904 row = self._fetch_row_from_db(self.id)
1905 self._update_fields_from_row(row)
1906
1907
jadmanski0afbb632008-06-06 21:10:57 +00001908 def count(self, where, table = None):
1909 if not table:
1910 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001911
jadmanski0afbb632008-06-06 21:10:57 +00001912 rows = _db.execute("""
1913 SELECT count(*) FROM %s
1914 WHERE %s
1915 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001916
jadmanski0afbb632008-06-06 21:10:57 +00001917 assert len(rows) == 1
1918
1919 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001920
1921
mblighf8c624d2008-07-03 16:58:45 +00001922 def update_field(self, field, value, condition=''):
showard2bab8f42008-11-12 18:15:22 +00001923 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00001924
showard2bab8f42008-11-12 18:15:22 +00001925 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00001926 return
mbligh36768f02008-02-22 18:28:33 +00001927
mblighf8c624d2008-07-03 16:58:45 +00001928 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
1929 if condition:
1930 query += ' AND (%s)' % condition
jadmanski0afbb632008-06-06 21:10:57 +00001931 _db.execute(query, (value, self.id))
1932
showard2bab8f42008-11-12 18:15:22 +00001933 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00001934
1935
jadmanski0afbb632008-06-06 21:10:57 +00001936 def save(self):
1937 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00001938 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00001939 columns = ','.join([str(key) for key in keys])
1940 values = ['"%s"' % self.__dict__[key] for key in keys]
showard89f84db2009-03-12 20:39:13 +00001941 values_str = ','.join(values)
1942 query = ('INSERT INTO %s (%s) VALUES (%s)' %
1943 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00001944 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00001945 # Update our id to the one the database just assigned to us.
1946 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00001947
1948
jadmanski0afbb632008-06-06 21:10:57 +00001949 def delete(self):
showarda3c58572009-03-12 20:36:59 +00001950 self._instances_by_type_and_id.pop((type(self), id), None)
1951 self._initialized = False
1952 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00001953 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1954 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00001955
1956
showard63a34772008-08-18 19:32:50 +00001957 @staticmethod
1958 def _prefix_with(string, prefix):
1959 if string:
1960 string = prefix + string
1961 return string
1962
1963
jadmanski0afbb632008-06-06 21:10:57 +00001964 @classmethod
showard989f25d2008-10-01 11:38:11 +00001965 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00001966 """
1967 Construct instances of our class based on the given database query.
1968
1969 @yields One class instance for each row fetched.
1970 """
showard63a34772008-08-18 19:32:50 +00001971 order_by = cls._prefix_with(order_by, 'ORDER BY ')
1972 where = cls._prefix_with(where, 'WHERE ')
1973 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00001974 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00001975 'joins' : joins,
1976 'where' : where,
1977 'order_by' : order_by})
1978 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00001979 for row in rows:
1980 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00001981
mbligh36768f02008-02-22 18:28:33 +00001982
1983class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001984 _table_name = 'ineligible_host_queues'
1985 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00001986
1987
showard89f84db2009-03-12 20:39:13 +00001988class AtomicGroup(DBObject):
1989 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00001990 _fields = ('id', 'name', 'description', 'max_number_of_machines',
1991 'invalid')
showard89f84db2009-03-12 20:39:13 +00001992
1993
showard989f25d2008-10-01 11:38:11 +00001994class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001995 _table_name = 'labels'
1996 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00001997 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00001998
1999
mbligh36768f02008-02-22 18:28:33 +00002000class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002001 _table_name = 'hosts'
2002 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2003 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2004
2005
jadmanski0afbb632008-06-06 21:10:57 +00002006 def current_task(self):
2007 rows = _db.execute("""
2008 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2009 """, (self.id,))
2010
2011 if len(rows) == 0:
2012 return None
2013 else:
2014 assert len(rows) == 1
2015 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002016 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002017
2018
jadmanski0afbb632008-06-06 21:10:57 +00002019 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002020 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002021 if self.current_task():
2022 self.current_task().requeue()
2023
showard6ae5ea92009-02-25 00:11:51 +00002024
jadmanski0afbb632008-06-06 21:10:57 +00002025 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002026 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002027 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002028
2029
showard170873e2009-01-07 00:22:26 +00002030 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002031 """
showard170873e2009-01-07 00:22:26 +00002032 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002033 """
2034 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002035 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002036 FROM labels
2037 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002038 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002039 ORDER BY labels.name
2040 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002041 platform = None
2042 all_labels = []
2043 for label_name, is_platform in rows:
2044 if is_platform:
2045 platform = label_name
2046 all_labels.append(label_name)
2047 return platform, all_labels
2048
2049
2050 def reverify_tasks(self):
2051 cleanup_task = CleanupTask(host=self)
2052 verify_task = VerifyTask(host=self)
2053 # just to make sure this host does not get taken away
2054 self.set_status('Cleaning')
2055 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002056
2057
mbligh36768f02008-02-22 18:28:33 +00002058class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002059 _table_name = 'host_queue_entries'
2060 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002061 'active', 'complete', 'deleted', 'execution_subdir',
2062 'atomic_group_id')
showard6ae5ea92009-02-25 00:11:51 +00002063
2064
showarda3c58572009-03-12 20:36:59 +00002065 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002066 assert id or row
showarda3c58572009-03-12 20:36:59 +00002067 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002068 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002069
jadmanski0afbb632008-06-06 21:10:57 +00002070 if self.host_id:
2071 self.host = Host(self.host_id)
2072 else:
2073 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002074
showard170873e2009-01-07 00:22:26 +00002075 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002076 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002077
2078
showard89f84db2009-03-12 20:39:13 +00002079 @classmethod
2080 def clone(cls, template):
2081 """
2082 Creates a new row using the values from a template instance.
2083
2084 The new instance will not exist in the database or have a valid
2085 id attribute until its save() method is called.
2086 """
2087 assert isinstance(template, cls)
2088 new_row = [getattr(template, field) for field in cls._fields]
2089 clone = cls(row=new_row, new_record=True)
2090 clone.id = None
2091 return clone
2092
2093
showardc85c21b2008-11-24 22:17:37 +00002094 def _view_job_url(self):
2095 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2096
2097
jadmanski0afbb632008-06-06 21:10:57 +00002098 def set_host(self, host):
2099 if host:
2100 self.queue_log_record('Assigning host ' + host.hostname)
2101 self.update_field('host_id', host.id)
2102 self.update_field('active', True)
2103 self.block_host(host.id)
2104 else:
2105 self.queue_log_record('Releasing host')
2106 self.unblock_host(self.host.id)
2107 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002108
jadmanski0afbb632008-06-06 21:10:57 +00002109 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002110
2111
jadmanski0afbb632008-06-06 21:10:57 +00002112 def get_host(self):
2113 return self.host
mbligh36768f02008-02-22 18:28:33 +00002114
2115
jadmanski0afbb632008-06-06 21:10:57 +00002116 def queue_log_record(self, log_line):
2117 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002118 _drone_manager.write_lines_to_file(self.queue_log_path,
2119 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002120
2121
jadmanski0afbb632008-06-06 21:10:57 +00002122 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002123 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002124 row = [0, self.job.id, host_id]
2125 block = IneligibleHostQueue(row=row, new_record=True)
2126 block.save()
mblighe2586682008-02-29 22:45:46 +00002127
2128
jadmanski0afbb632008-06-06 21:10:57 +00002129 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002130 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002131 blocks = IneligibleHostQueue.fetch(
2132 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2133 for block in blocks:
2134 block.delete()
mblighe2586682008-02-29 22:45:46 +00002135
2136
showard2bab8f42008-11-12 18:15:22 +00002137 def set_execution_subdir(self, subdir=None):
2138 if subdir is None:
2139 assert self.get_host()
2140 subdir = self.get_host().hostname
2141 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002142
2143
showard6355f6b2008-12-05 18:52:13 +00002144 def _get_hostname(self):
2145 if self.host:
2146 return self.host.hostname
2147 return 'no host'
2148
2149
showard170873e2009-01-07 00:22:26 +00002150 def __str__(self):
2151 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2152
2153
jadmanski0afbb632008-06-06 21:10:57 +00002154 def set_status(self, status):
mblighf8c624d2008-07-03 16:58:45 +00002155 abort_statuses = ['Abort', 'Aborting', 'Aborted']
2156 if status not in abort_statuses:
2157 condition = ' AND '.join(['status <> "%s"' % x
2158 for x in abort_statuses])
2159 else:
2160 condition = ''
2161 self.update_field('status', status, condition=condition)
2162
showardb18134f2009-03-20 20:52:18 +00002163 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002164
showardc85c21b2008-11-24 22:17:37 +00002165 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002166 self.update_field('complete', False)
2167 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002168
jadmanski0afbb632008-06-06 21:10:57 +00002169 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showarde58e3f82008-11-20 19:04:59 +00002170 'Aborting']:
jadmanski0afbb632008-06-06 21:10:57 +00002171 self.update_field('complete', False)
2172 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002173
showardc85c21b2008-11-24 22:17:37 +00002174 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002175 self.update_field('complete', True)
2176 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002177
2178 should_email_status = (status.lower() in _notify_email_statuses or
2179 'all' in _notify_email_statuses)
2180 if should_email_status:
2181 self._email_on_status(status)
2182
2183 self._email_on_job_complete()
2184
2185
2186 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002187 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002188
2189 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2190 self.job.id, self.job.name, hostname, status)
2191 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2192 self.job.id, self.job.name, hostname, status,
2193 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002194 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002195
2196
2197 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002198 if not self.job.is_finished():
2199 return
showard542e8402008-09-19 20:16:18 +00002200
showardc85c21b2008-11-24 22:17:37 +00002201 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002202 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002203 for queue_entry in hosts_queue:
2204 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002205 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002206 queue_entry.status))
2207
2208 summary_text = "\n".join(summary_text)
2209 status_counts = models.Job.objects.get_status_counts(
2210 [self.job.id])[self.job.id]
2211 status = ', '.join('%d %s' % (count, status) for status, count
2212 in status_counts.iteritems())
2213
2214 subject = 'Autotest: Job ID: %s "%s" %s' % (
2215 self.job.id, self.job.name, status)
2216 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2217 self.job.id, self.job.name, status, self._view_job_url(),
2218 summary_text)
showard170873e2009-01-07 00:22:26 +00002219 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002220
2221
showard89f84db2009-03-12 20:39:13 +00002222 def run(self, assigned_host=None):
2223 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002224 assert assigned_host
2225 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002226 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002227
showardb18134f2009-03-20 20:52:18 +00002228 logging.info("%s/%s/%s scheduled on %s, status=%s",
2229 self.job.name, self.meta_host, self.atomic_group_id,
2230 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002231
jadmanski0afbb632008-06-06 21:10:57 +00002232 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002233
showard6ae5ea92009-02-25 00:11:51 +00002234
jadmanski0afbb632008-06-06 21:10:57 +00002235 def requeue(self):
2236 self.set_status('Queued')
showardde634ee2009-01-30 01:44:24 +00002237 # verify/cleanup failure sets the execution subdir, so reset it here
2238 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002239 if self.meta_host:
2240 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002241
2242
jadmanski0afbb632008-06-06 21:10:57 +00002243 def handle_host_failure(self):
2244 """\
2245 Called when this queue entry's host has failed verification and
2246 repair.
2247 """
2248 assert not self.meta_host
2249 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002250 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002251
2252
jadmanskif7fa2cc2008-10-01 14:13:23 +00002253 @property
2254 def aborted_by(self):
2255 self._load_abort_info()
2256 return self._aborted_by
2257
2258
2259 @property
2260 def aborted_on(self):
2261 self._load_abort_info()
2262 return self._aborted_on
2263
2264
2265 def _load_abort_info(self):
2266 """ Fetch info about who aborted the job. """
2267 if hasattr(self, "_aborted_by"):
2268 return
2269 rows = _db.execute("""
2270 SELECT users.login, aborted_host_queue_entries.aborted_on
2271 FROM aborted_host_queue_entries
2272 INNER JOIN users
2273 ON users.id = aborted_host_queue_entries.aborted_by_id
2274 WHERE aborted_host_queue_entries.queue_entry_id = %s
2275 """, (self.id,))
2276 if rows:
2277 self._aborted_by, self._aborted_on = rows[0]
2278 else:
2279 self._aborted_by = self._aborted_on = None
2280
2281
showardb2e2c322008-10-14 17:33:55 +00002282 def on_pending(self):
2283 """
2284 Called when an entry in a synchronous job has passed verify. If the
2285 job is ready to run, returns an agent to run the job. Returns None
2286 otherwise.
2287 """
2288 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002289 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002290 if self.job.is_ready():
2291 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002292 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002293 return None
2294
2295
showard170873e2009-01-07 00:22:26 +00002296 def abort(self, dispatcher, agents_to_abort=[]):
showard1be97432008-10-17 15:30:45 +00002297 host = self.get_host()
showard9d9ffd52008-11-09 23:14:35 +00002298 if self.active and host:
showard170873e2009-01-07 00:22:26 +00002299 dispatcher.add_agent(Agent(tasks=host.reverify_tasks()))
showard1be97432008-10-17 15:30:45 +00002300
showard170873e2009-01-07 00:22:26 +00002301 abort_task = AbortTask(self, agents_to_abort)
showard1be97432008-10-17 15:30:45 +00002302 self.set_status('Aborting')
showard170873e2009-01-07 00:22:26 +00002303 dispatcher.add_agent(Agent(tasks=[abort_task], num_processes=0))
2304
2305 def execution_tag(self):
2306 assert self.execution_subdir
2307 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002308
2309
mbligh36768f02008-02-22 18:28:33 +00002310class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002311 _table_name = 'jobs'
2312 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2313 'control_type', 'created_on', 'synch_count', 'timeout',
2314 'run_verify', 'email_list', 'reboot_before', 'reboot_after')
2315
2316
showarda3c58572009-03-12 20:36:59 +00002317 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002318 assert id or row
showarda3c58572009-03-12 20:36:59 +00002319 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002320
mblighe2586682008-02-29 22:45:46 +00002321
jadmanski0afbb632008-06-06 21:10:57 +00002322 def is_server_job(self):
2323 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002324
2325
showard170873e2009-01-07 00:22:26 +00002326 def tag(self):
2327 return "%s-%s" % (self.id, self.owner)
2328
2329
jadmanski0afbb632008-06-06 21:10:57 +00002330 def get_host_queue_entries(self):
2331 rows = _db.execute("""
2332 SELECT * FROM host_queue_entries
2333 WHERE job_id= %s
2334 """, (self.id,))
2335 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002336
jadmanski0afbb632008-06-06 21:10:57 +00002337 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002338
jadmanski0afbb632008-06-06 21:10:57 +00002339 return entries
mbligh36768f02008-02-22 18:28:33 +00002340
2341
jadmanski0afbb632008-06-06 21:10:57 +00002342 def set_status(self, status, update_queues=False):
2343 self.update_field('status',status)
2344
2345 if update_queues:
2346 for queue_entry in self.get_host_queue_entries():
2347 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002348
2349
jadmanski0afbb632008-06-06 21:10:57 +00002350 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002351 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2352 status='Pending')
2353 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002354
2355
jadmanski0afbb632008-06-06 21:10:57 +00002356 def num_machines(self, clause = None):
2357 sql = "job_id=%s" % self.id
2358 if clause:
2359 sql += " AND (%s)" % clause
2360 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002361
2362
jadmanski0afbb632008-06-06 21:10:57 +00002363 def num_queued(self):
2364 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002365
2366
jadmanski0afbb632008-06-06 21:10:57 +00002367 def num_active(self):
2368 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002369
2370
jadmanski0afbb632008-06-06 21:10:57 +00002371 def num_complete(self):
2372 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002373
2374
jadmanski0afbb632008-06-06 21:10:57 +00002375 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002376 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002377
mbligh36768f02008-02-22 18:28:33 +00002378
showard6bb7c292009-01-30 01:44:51 +00002379 def _not_yet_run_entries(self, include_verifying=True):
2380 statuses = [models.HostQueueEntry.Status.QUEUED,
2381 models.HostQueueEntry.Status.PENDING]
2382 if include_verifying:
2383 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2384 return models.HostQueueEntry.objects.filter(job=self.id,
2385 status__in=statuses)
2386
2387
2388 def _stop_all_entries(self):
2389 entries_to_stop = self._not_yet_run_entries(
2390 include_verifying=False)
2391 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002392 assert not child_entry.complete, (
2393 '%s status=%s, active=%s, complete=%s' %
2394 (child_entry.id, child_entry.status, child_entry.active,
2395 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002396 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2397 child_entry.host.status = models.Host.Status.READY
2398 child_entry.host.save()
2399 child_entry.status = models.HostQueueEntry.Status.STOPPED
2400 child_entry.save()
2401
showard2bab8f42008-11-12 18:15:22 +00002402 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002403 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002404 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002405 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002406
2407
jadmanski0afbb632008-06-06 21:10:57 +00002408 def write_to_machines_file(self, queue_entry):
2409 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002410 file_path = os.path.join(self.tag(), '.machines')
2411 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002412
2413
showard2bab8f42008-11-12 18:15:22 +00002414 def _next_group_name(self):
2415 query = models.HostQueueEntry.objects.filter(
2416 job=self.id).values('execution_subdir').distinct()
2417 subdirs = (entry['execution_subdir'] for entry in query)
2418 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2419 ids = [int(match.group(1)) for match in groups if match]
2420 if ids:
2421 next_id = max(ids) + 1
2422 else:
2423 next_id = 0
2424 return "group%d" % next_id
2425
2426
showard170873e2009-01-07 00:22:26 +00002427 def _write_control_file(self, execution_tag):
2428 control_path = _drone_manager.attach_file_to_execution(
2429 execution_tag, self.control_file)
2430 return control_path
mbligh36768f02008-02-22 18:28:33 +00002431
showardb2e2c322008-10-14 17:33:55 +00002432
showard2bab8f42008-11-12 18:15:22 +00002433 def get_group_entries(self, queue_entry_from_group):
2434 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002435 return list(HostQueueEntry.fetch(
2436 where='job_id=%s AND execution_subdir=%s',
2437 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002438
2439
showardb2e2c322008-10-14 17:33:55 +00002440 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002441 assert queue_entries
2442 execution_tag = queue_entries[0].execution_tag()
2443 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002444 hostnames = ','.join([entry.get_host().hostname
2445 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002446
showard170873e2009-01-07 00:22:26 +00002447 params = [_autoserv_path, '-P', execution_tag, '-p', '-n',
2448 '-r', _drone_manager.absolute_path(execution_tag),
2449 '-u', self.owner, '-l', self.name, '-m', hostnames,
2450 _drone_manager.absolute_path(control_path)]
mbligh36768f02008-02-22 18:28:33 +00002451
jadmanski0afbb632008-06-06 21:10:57 +00002452 if not self.is_server_job():
2453 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002454
showardb2e2c322008-10-14 17:33:55 +00002455 return params
mblighe2586682008-02-29 22:45:46 +00002456
mbligh36768f02008-02-22 18:28:33 +00002457
showardc9ae1782009-01-30 01:42:37 +00002458 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002459 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002460 return True
showard0fc38302008-10-23 00:44:07 +00002461 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002462 return queue_entry.get_host().dirty
2463 return False
showard21baa452008-10-21 00:08:39 +00002464
showardc9ae1782009-01-30 01:42:37 +00002465
2466 def _should_run_verify(self, queue_entry):
2467 do_not_verify = (queue_entry.host.protection ==
2468 host_protections.Protection.DO_NOT_VERIFY)
2469 if do_not_verify:
2470 return False
2471 return self.run_verify
2472
2473
2474 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002475 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002476 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002477 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002478 if self._should_run_verify(queue_entry):
2479 tasks.append(VerifyTask(queue_entry=queue_entry))
2480 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002481 return tasks
2482
2483
showard2bab8f42008-11-12 18:15:22 +00002484 def _assign_new_group(self, queue_entries):
2485 if len(queue_entries) == 1:
2486 group_name = queue_entries[0].get_host().hostname
2487 else:
2488 group_name = self._next_group_name()
showardb18134f2009-03-20 20:52:18 +00002489 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002490 self.id, [entry.host.hostname for entry in queue_entries],
2491 group_name)
2492
2493 for queue_entry in queue_entries:
2494 queue_entry.set_execution_subdir(group_name)
2495
2496
2497 def _choose_group_to_run(self, include_queue_entry):
2498 chosen_entries = [include_queue_entry]
2499
2500 num_entries_needed = self.synch_count - 1
2501 if num_entries_needed > 0:
2502 pending_entries = HostQueueEntry.fetch(
2503 where='job_id = %s AND status = "Pending" AND id != %s',
2504 params=(self.id, include_queue_entry.id))
2505 chosen_entries += list(pending_entries)[:num_entries_needed]
2506
2507 self._assign_new_group(chosen_entries)
2508 return chosen_entries
2509
2510
2511 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002512 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002513 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2514 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002515
showard2bab8f42008-11-12 18:15:22 +00002516 queue_entries = self._choose_group_to_run(queue_entry)
2517 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002518
2519
2520 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002521 for queue_entry in queue_entries:
2522 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002523 params = self._get_autoserv_params(queue_entries)
2524 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2525 cmd=params)
2526 tasks = initial_tasks + [queue_task]
2527 entry_ids = [entry.id for entry in queue_entries]
2528
showard170873e2009-01-07 00:22:26 +00002529 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002530
2531
mbligh36768f02008-02-22 18:28:33 +00002532if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002533 main()