blob: ab9ec4258fa00c87816d5c2d1de4d5e458546235 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard89f84db2009-03-12 20:39:13 +00008import datetime, errno, optparse, os, pwd, Queue, random, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000017from autotest_lib.frontend.afe import models
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
showardd1ee1dd2009-01-07 21:33:08 +000019from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000020
mblighb090f142008-02-27 21:33:46 +000021
mbligh36768f02008-02-22 18:28:33 +000022RESULTS_DIR = '.'
23AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000024DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000025
26AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
27
28if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000029 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000030AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
31AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
32
33if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000034 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000035
mbligh90a549d2008-03-25 23:52:34 +000036# how long to wait for autoserv to write a pidfile
37PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000038
showard35162b02009-03-03 02:17:30 +000039# error message to leave in results dir when an autoserv process disappears
40# mysteriously
41_LOST_PROCESS_ERROR = """\
42Autoserv failed abnormally during execution for this job, probably due to a
43system error on the Autotest server. Full results may not be available. Sorry.
44"""
45
mbligh6f8bab42008-02-29 22:45:14 +000046_db = None
mbligh36768f02008-02-22 18:28:33 +000047_shutdown = False
showard170873e2009-01-07 00:22:26 +000048_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
49_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000050_testing_mode = False
showard542e8402008-09-19 20:16:18 +000051_base_url = None
showardc85c21b2008-11-24 22:17:37 +000052_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000053_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000054
showardb18134f2009-03-20 20:52:18 +000055# load the logging settings
56scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000057if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
58 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000059# Here we export the log name, using the same convention as autoserv's results
60# directory.
mblighc9895aa2009-04-01 18:36:58 +000061if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
62 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
63else:
64 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
65 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
66
showardb18134f2009-03-20 20:52:18 +000067logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
68
mbligh36768f02008-02-22 18:28:33 +000069
70def main():
showard27f33872009-04-07 18:20:53 +000071 try:
72 main_without_exception_handling()
73 except:
74 logging.exception('Exception escaping in monitor_db')
75 raise
76
77
78def main_without_exception_handling():
jadmanski0afbb632008-06-06 21:10:57 +000079 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000080
jadmanski0afbb632008-06-06 21:10:57 +000081 parser = optparse.OptionParser(usage)
82 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
83 action='store_true')
84 parser.add_option('--logfile', help='Set a log file that all stdout ' +
85 'should be redirected to. Stderr will go to this ' +
86 'file + ".err"')
87 parser.add_option('--test', help='Indicate that scheduler is under ' +
88 'test and should use dummy autoserv and no parsing',
89 action='store_true')
90 (options, args) = parser.parse_args()
91 if len(args) != 1:
92 parser.print_usage()
93 return
mbligh36768f02008-02-22 18:28:33 +000094
jadmanski0afbb632008-06-06 21:10:57 +000095 global RESULTS_DIR
96 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +000097
showardcca334f2009-03-12 20:38:34 +000098 # Change the cwd while running to avoid issues incase we were launched from
99 # somewhere odd (such as a random NFS home directory of the person running
100 # sudo to launch us as the appropriate user).
101 os.chdir(RESULTS_DIR)
102
jadmanski0afbb632008-06-06 21:10:57 +0000103 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000104 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
105 "notify_email_statuses",
106 default='')
showardc85c21b2008-11-24 22:17:37 +0000107 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000108 _notify_email_statuses = [status for status in
109 re.split(r'[\s,;:]', notify_statuses_list.lower())
110 if status]
showardc85c21b2008-11-24 22:17:37 +0000111
jadmanski0afbb632008-06-06 21:10:57 +0000112 if options.test:
113 global _autoserv_path
114 _autoserv_path = 'autoserv_dummy'
115 global _testing_mode
116 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000117
mbligh37eceaa2008-12-15 22:56:37 +0000118 # AUTOTEST_WEB.base_url is still a supported config option as some people
119 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000120 global _base_url
showard170873e2009-01-07 00:22:26 +0000121 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
122 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000123 if config_base_url:
124 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000125 else:
mbligh37eceaa2008-12-15 22:56:37 +0000126 # For the common case of everything running on a single server you
127 # can just set the hostname in a single place in the config file.
128 server_name = c.get_config_value('SERVER', 'hostname')
129 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000130 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000131 sys.exit(1)
132 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000133
showardc5afc462009-01-13 00:09:39 +0000134 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000135 server.start()
136
jadmanski0afbb632008-06-06 21:10:57 +0000137 try:
showardc5afc462009-01-13 00:09:39 +0000138 init(options.logfile)
139 dispatcher = Dispatcher()
140 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
141
jadmanski0afbb632008-06-06 21:10:57 +0000142 while not _shutdown:
143 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000144 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000145 except:
showard170873e2009-01-07 00:22:26 +0000146 email_manager.manager.log_stacktrace(
147 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000148
showard170873e2009-01-07 00:22:26 +0000149 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000150 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000151 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000152 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000153
154
155def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000156 global _shutdown
157 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000158 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000159
160
161def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000162 if logfile:
163 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000164 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
165 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000166
mblighfb676032009-04-01 18:25:38 +0000167 utils.write_pid("monitor_db")
168
showardb1e51872008-10-07 11:08:18 +0000169 if _testing_mode:
170 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000171 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000172
jadmanski0afbb632008-06-06 21:10:57 +0000173 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
174 global _db
showard170873e2009-01-07 00:22:26 +0000175 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000176 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000177
showardfa8629c2008-11-04 16:51:23 +0000178 # ensure Django connection is in autocommit
179 setup_django_environment.enable_autocommit()
180
showardb18134f2009-03-20 20:52:18 +0000181 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000182 signal.signal(signal.SIGINT, handle_sigint)
183
showardd1ee1dd2009-01-07 21:33:08 +0000184 drones = global_config.global_config.get_config_value(
185 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
186 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000187 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000188 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000189 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
190
showardb18134f2009-03-20 20:52:18 +0000191 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000192
193
194def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000195 out_file = logfile
196 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000197 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000198 out_fd = open(out_file, "a", buffering=0)
199 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000200
jadmanski0afbb632008-06-06 21:10:57 +0000201 os.dup2(out_fd.fileno(), sys.stdout.fileno())
202 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000203
jadmanski0afbb632008-06-06 21:10:57 +0000204 sys.stdout = out_fd
205 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000206
207
mblighd5c95802008-03-05 00:33:46 +0000208def queue_entries_to_abort():
jadmanski0afbb632008-06-06 21:10:57 +0000209 rows = _db.execute("""
210 SELECT * FROM host_queue_entries WHERE status='Abort';
211 """)
showard2bab8f42008-11-12 18:15:22 +0000212
jadmanski0afbb632008-06-06 21:10:57 +0000213 qe = [HostQueueEntry(row=i) for i in rows]
214 return qe
mbligh36768f02008-02-22 18:28:33 +0000215
showard7cf9a9b2008-05-15 21:15:52 +0000216
showard89f84db2009-03-12 20:39:13 +0000217class SchedulerError(Exception):
218 """Raised by HostScheduler when an inconsistent state occurs."""
219
220
showard63a34772008-08-18 19:32:50 +0000221class HostScheduler(object):
222 def _get_ready_hosts(self):
223 # avoid any host with a currently active queue entry against it
224 hosts = Host.fetch(
225 joins='LEFT JOIN host_queue_entries AS active_hqe '
226 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000227 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000228 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000229 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000230 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
231 return dict((host.id, host) for host in hosts)
232
233
234 @staticmethod
235 def _get_sql_id_list(id_list):
236 return ','.join(str(item_id) for item_id in id_list)
237
238
239 @classmethod
showard989f25d2008-10-01 11:38:11 +0000240 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000241 if not id_list:
242 return {}
showard63a34772008-08-18 19:32:50 +0000243 query %= cls._get_sql_id_list(id_list)
244 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000245 return cls._process_many2many_dict(rows, flip)
246
247
248 @staticmethod
249 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000250 result = {}
251 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000252 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000253 if flip:
254 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000255 result.setdefault(left_id, set()).add(right_id)
256 return result
257
258
259 @classmethod
260 def _get_job_acl_groups(cls, job_ids):
261 query = """
showardd9ac4452009-02-07 02:04:37 +0000262 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000263 FROM jobs
264 INNER JOIN users ON users.login = jobs.owner
265 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
266 WHERE jobs.id IN (%s)
267 """
268 return cls._get_many2many_dict(query, job_ids)
269
270
271 @classmethod
272 def _get_job_ineligible_hosts(cls, job_ids):
273 query = """
274 SELECT job_id, host_id
275 FROM ineligible_host_queues
276 WHERE job_id IN (%s)
277 """
278 return cls._get_many2many_dict(query, job_ids)
279
280
281 @classmethod
showard989f25d2008-10-01 11:38:11 +0000282 def _get_job_dependencies(cls, job_ids):
283 query = """
284 SELECT job_id, label_id
285 FROM jobs_dependency_labels
286 WHERE job_id IN (%s)
287 """
288 return cls._get_many2many_dict(query, job_ids)
289
290
291 @classmethod
showard63a34772008-08-18 19:32:50 +0000292 def _get_host_acls(cls, host_ids):
293 query = """
showardd9ac4452009-02-07 02:04:37 +0000294 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000295 FROM acl_groups_hosts
296 WHERE host_id IN (%s)
297 """
298 return cls._get_many2many_dict(query, host_ids)
299
300
301 @classmethod
302 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000303 if not host_ids:
304 return {}, {}
showard63a34772008-08-18 19:32:50 +0000305 query = """
306 SELECT label_id, host_id
307 FROM hosts_labels
308 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000309 """ % cls._get_sql_id_list(host_ids)
310 rows = _db.execute(query)
311 labels_to_hosts = cls._process_many2many_dict(rows)
312 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
313 return labels_to_hosts, hosts_to_labels
314
315
316 @classmethod
317 def _get_labels(cls):
318 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000319
320
321 def refresh(self, pending_queue_entries):
322 self._hosts_available = self._get_ready_hosts()
323
324 relevant_jobs = [queue_entry.job_id
325 for queue_entry in pending_queue_entries]
326 self._job_acls = self._get_job_acl_groups(relevant_jobs)
327 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000328 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000329
330 host_ids = self._hosts_available.keys()
331 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000332 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
333
334 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000335
336
337 def _is_acl_accessible(self, host_id, queue_entry):
338 job_acls = self._job_acls.get(queue_entry.job_id, set())
339 host_acls = self._host_acls.get(host_id, set())
340 return len(host_acls.intersection(job_acls)) > 0
341
342
showard989f25d2008-10-01 11:38:11 +0000343 def _check_job_dependencies(self, job_dependencies, host_labels):
344 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000345 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000346
347
348 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
349 queue_entry):
showardade14e22009-01-26 22:38:32 +0000350 if not queue_entry.meta_host:
351 # bypass only_if_needed labels when a specific host is selected
352 return True
353
showard989f25d2008-10-01 11:38:11 +0000354 for label_id in host_labels:
355 label = self._labels[label_id]
356 if not label.only_if_needed:
357 # we don't care about non-only_if_needed labels
358 continue
359 if queue_entry.meta_host == label_id:
360 # if the label was requested in a metahost it's OK
361 continue
362 if label_id not in job_dependencies:
363 return False
364 return True
365
366
showard89f84db2009-03-12 20:39:13 +0000367 def _check_atomic_group_labels(self, host_labels, queue_entry):
368 """
369 Determine if the given HostQueueEntry's atomic group settings are okay
370 to schedule on a host with the given labels.
371
372 @param host_labels - A list of label ids that the host has.
373 @param queue_entry - The HostQueueEntry being considered for the host.
374
375 @returns True if atomic group settings are okay, False otherwise.
376 """
377 return (self._get_host_atomic_group_id(host_labels) ==
378 queue_entry.atomic_group_id)
379
380
381 def _get_host_atomic_group_id(self, host_labels):
382 """
383 Return the atomic group label id for a host with the given set of
384 labels if any, or None otherwise. Raises an exception if more than
385 one atomic group are found in the set of labels.
386
387 @param host_labels - A list of label ids that the host has.
388
389 @returns The id of the atomic group found on a label in host_labels
390 or None if no atomic group label is found.
391 @raises SchedulerError - If more than one atomic group label is found.
392 """
393 atomic_ids = [self._labels[label_id].atomic_group_id
394 for label_id in host_labels
395 if self._labels[label_id].atomic_group_id is not None]
396 if not atomic_ids:
397 return None
398 if len(atomic_ids) > 1:
399 raise SchedulerError('More than one atomic label on host.')
400 return atomic_ids[0]
401
402
403 def _get_atomic_group_labels(self, atomic_group_id):
404 """
405 Lookup the label ids that an atomic_group is associated with.
406
407 @param atomic_group_id - The id of the AtomicGroup to look up.
408
409 @returns A generator yeilding Label ids for this atomic group.
410 """
411 return (id for id, label in self._labels.iteritems()
412 if label.atomic_group_id == atomic_group_id
413 and not label.invalid)
414
415
416 def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
417 """
418 @param group_hosts - A sequence of Host ids to test for usability
419 and eligibility against the Job associated with queue_entry.
420 @param queue_entry - The HostQueueEntry that these hosts are being
421 tested for eligibility against.
422
423 @returns A subset of group_hosts Host ids that are eligible for the
424 supplied queue_entry.
425 """
426 return set(host_id for host_id in group_hosts
427 if self._is_host_usable(host_id)
428 and self._is_host_eligible_for_job(host_id, queue_entry))
429
430
showard989f25d2008-10-01 11:38:11 +0000431 def _is_host_eligible_for_job(self, host_id, queue_entry):
432 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
433 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000434
showard89f84db2009-03-12 20:39:13 +0000435 return (self._is_acl_accessible(host_id, queue_entry) and
436 self._check_job_dependencies(job_dependencies, host_labels) and
437 self._check_only_if_needed_labels(
438 job_dependencies, host_labels, queue_entry) and
439 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000440
441
showard63a34772008-08-18 19:32:50 +0000442 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000443 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000444 return None
445 return self._hosts_available.pop(queue_entry.host_id, None)
446
447
448 def _is_host_usable(self, host_id):
449 if host_id not in self._hosts_available:
450 # host was already used during this scheduling cycle
451 return False
452 if self._hosts_available[host_id].invalid:
453 # Invalid hosts cannot be used for metahosts. They're included in
454 # the original query because they can be used by non-metahosts.
455 return False
456 return True
457
458
459 def _schedule_metahost(self, queue_entry):
460 label_id = queue_entry.meta_host
461 hosts_in_label = self._label_hosts.get(label_id, set())
462 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
463 set())
464
465 # must iterate over a copy so we can mutate the original while iterating
466 for host_id in list(hosts_in_label):
467 if not self._is_host_usable(host_id):
468 hosts_in_label.remove(host_id)
469 continue
470 if host_id in ineligible_host_ids:
471 continue
showard989f25d2008-10-01 11:38:11 +0000472 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000473 continue
474
showard89f84db2009-03-12 20:39:13 +0000475 # Remove the host from our cached internal state before returning
476 # the host object.
showard63a34772008-08-18 19:32:50 +0000477 hosts_in_label.remove(host_id)
478 return self._hosts_available.pop(host_id)
479 return None
480
481
482 def find_eligible_host(self, queue_entry):
483 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000484 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000485 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000486 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000487 return self._schedule_metahost(queue_entry)
488
489
showard89f84db2009-03-12 20:39:13 +0000490 def find_eligible_atomic_group(self, queue_entry):
491 """
492 Given an atomic group host queue entry, locate an appropriate group
493 of hosts for the associated job to run on.
494
495 The caller is responsible for creating new HQEs for the additional
496 hosts returned in order to run the actual job on them.
497
498 @returns A list of Host instances in a ready state to satisfy this
499 atomic group scheduling. Hosts will all belong to the same
500 atomic group label as specified by the queue_entry.
501 An empty list will be returned if no suitable atomic
502 group could be found.
503
504 TODO(gps): what is responsible for kicking off any attempted repairs on
505 a group of hosts? not this function, but something needs to. We do
506 not communicate that reason for returning [] outside of here...
507 For now, we'll just be unschedulable if enough hosts within one group
508 enter Repair Failed state.
509 """
510 assert queue_entry.atomic_group_id is not None
511 job = queue_entry.job
512 assert job.synch_count and job.synch_count > 0
513 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
514 if job.synch_count > atomic_group.max_number_of_machines:
515 # Such a Job and HostQueueEntry should never be possible to
516 # create using the frontend. Regardless, we can't process it.
517 # Abort it immediately and log an error on the scheduler.
518 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000519 logging.error(
520 'Error: job %d synch_count=%d > requested atomic_group %d '
521 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
522 job.id, job.synch_count, atomic_group.id,
523 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000524 return []
525 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
526 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
527 set())
528
529 # Look in each label associated with atomic_group until we find one with
530 # enough hosts to satisfy the job.
531 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
532 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
533 if queue_entry.meta_host is not None:
534 # If we have a metahost label, only allow its hosts.
535 group_hosts.intersection_update(hosts_in_label)
536 group_hosts -= ineligible_host_ids
537 eligible_hosts_in_group = self._get_eligible_hosts_in_group(
538 group_hosts, queue_entry)
539
540 # Job.synch_count is treated as "minimum synch count" when
541 # scheduling for an atomic group of hosts. The atomic group
542 # number of machines is the maximum to pick out of a single
543 # atomic group label for scheduling at one time.
544 min_hosts = job.synch_count
545 max_hosts = atomic_group.max_number_of_machines
546
547 if len(eligible_hosts_in_group) < min_hosts:
548 # Not enough eligible hosts in this atomic group label.
549 continue
550
551 # Limit ourselves to scheduling the atomic group size.
552 if len(eligible_hosts_in_group) > max_hosts:
553 eligible_hosts_in_group = random.sample(
554 eligible_hosts_in_group, max_hosts)
555
556 # Remove the selected hosts from our cached internal state
557 # of available hosts in order to return the Host objects.
558 host_list = []
559 for host_id in eligible_hosts_in_group:
560 hosts_in_label.discard(host_id)
561 host_list.append(self._hosts_available.pop(host_id))
562 return host_list
563
564 return []
565
566
showard170873e2009-01-07 00:22:26 +0000567class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000568 def __init__(self):
569 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000570 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000571 self._host_scheduler = HostScheduler()
showard170873e2009-01-07 00:22:26 +0000572 self._host_agents = {}
573 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000574
mbligh36768f02008-02-22 18:28:33 +0000575
jadmanski0afbb632008-06-06 21:10:57 +0000576 def do_initial_recovery(self, recover_hosts=True):
577 # always recover processes
578 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000579
jadmanski0afbb632008-06-06 21:10:57 +0000580 if recover_hosts:
581 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000582
583
jadmanski0afbb632008-06-06 21:10:57 +0000584 def tick(self):
showard170873e2009-01-07 00:22:26 +0000585 _drone_manager.refresh()
showarda3ab0d52008-11-03 19:03:47 +0000586 self._run_cleanup_maybe()
jadmanski0afbb632008-06-06 21:10:57 +0000587 self._find_aborting()
588 self._schedule_new_jobs()
589 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000590 _drone_manager.execute_actions()
591 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000592
showard97aed502008-11-04 02:01:24 +0000593
showarda3ab0d52008-11-03 19:03:47 +0000594 def _run_cleanup_maybe(self):
showardd1ee1dd2009-01-07 21:33:08 +0000595 should_cleanup = (self._last_clean_time +
596 scheduler_config.config.clean_interval * 60 <
597 time.time())
598 if should_cleanup:
showardb18134f2009-03-20 20:52:18 +0000599 logging.info('Running cleanup')
showarda3ab0d52008-11-03 19:03:47 +0000600 self._abort_timed_out_jobs()
601 self._abort_jobs_past_synch_start_timeout()
602 self._clear_inactive_blocks()
showardfa8629c2008-11-04 16:51:23 +0000603 self._check_for_db_inconsistencies()
showarda3ab0d52008-11-03 19:03:47 +0000604 self._last_clean_time = time.time()
605
mbligh36768f02008-02-22 18:28:33 +0000606
showard170873e2009-01-07 00:22:26 +0000607 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
608 for object_id in object_ids:
609 agent_dict.setdefault(object_id, set()).add(agent)
610
611
612 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
613 for object_id in object_ids:
614 assert object_id in agent_dict
615 agent_dict[object_id].remove(agent)
616
617
jadmanski0afbb632008-06-06 21:10:57 +0000618 def add_agent(self, agent):
619 self._agents.append(agent)
620 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000621 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
622 self._register_agent_for_ids(self._queue_entry_agents,
623 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000624
showard170873e2009-01-07 00:22:26 +0000625
626 def get_agents_for_entry(self, queue_entry):
627 """
628 Find agents corresponding to the specified queue_entry.
629 """
630 return self._queue_entry_agents.get(queue_entry.id, set())
631
632
633 def host_has_agent(self, host):
634 """
635 Determine if there is currently an Agent present using this host.
636 """
637 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000638
639
jadmanski0afbb632008-06-06 21:10:57 +0000640 def remove_agent(self, agent):
641 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000642 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
643 agent)
644 self._unregister_agent_for_ids(self._queue_entry_agents,
645 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000646
647
showard4c5374f2008-09-04 17:02:56 +0000648 def num_running_processes(self):
649 return sum(agent.num_processes for agent in self._agents
650 if agent.is_running())
mblighbb421852008-03-11 22:36:16 +0000651
652
showard170873e2009-01-07 00:22:26 +0000653 def _extract_execution_tag(self, command_line):
654 match = re.match(r'.* -P (\S+) ', command_line)
655 if not match:
656 return None
657 return match.group(1)
mblighbb421852008-03-11 22:36:16 +0000658
659
showard2bab8f42008-11-12 18:15:22 +0000660 def _recover_queue_entries(self, queue_entries, run_monitor):
661 assert len(queue_entries) > 0
showard2bab8f42008-11-12 18:15:22 +0000662 queue_task = RecoveryQueueTask(job=queue_entries[0].job,
663 queue_entries=queue_entries,
664 run_monitor=run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +0000665 self.add_agent(Agent(tasks=[queue_task],
showard170873e2009-01-07 00:22:26 +0000666 num_processes=len(queue_entries)))
mblighbb421852008-03-11 22:36:16 +0000667
668
jadmanski0afbb632008-06-06 21:10:57 +0000669 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000670 self._register_pidfiles()
671 _drone_manager.refresh()
672 self._recover_running_entries()
673 self._recover_aborting_entries()
674 self._requeue_other_active_entries()
675 self._recover_parsing_entries()
676 self._reverify_remaining_hosts()
677 # reinitialize drones after killing orphaned processes, since they can
678 # leave around files when they die
679 _drone_manager.execute_actions()
680 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000681
showard170873e2009-01-07 00:22:26 +0000682
683 def _register_pidfiles(self):
684 # during recovery we may need to read pidfiles for both running and
685 # parsing entries
686 queue_entries = HostQueueEntry.fetch(
687 where="status IN ('Running', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000688 for queue_entry in queue_entries:
showard170873e2009-01-07 00:22:26 +0000689 pidfile_id = _drone_manager.get_pidfile_id_from(
690 queue_entry.execution_tag())
691 _drone_manager.register_pidfile(pidfile_id)
692
693
694 def _recover_running_entries(self):
695 orphans = _drone_manager.get_orphaned_autoserv_processes()
696
697 queue_entries = HostQueueEntry.fetch(where="status = 'Running'")
698 requeue_entries = []
699 for queue_entry in queue_entries:
700 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000701 # synchronous job we've already recovered
702 continue
showard170873e2009-01-07 00:22:26 +0000703 execution_tag = queue_entry.execution_tag()
704 run_monitor = PidfileRunMonitor()
705 run_monitor.attach_to_existing_process(execution_tag)
706 if not run_monitor.has_process():
707 # autoserv apparently never got run, so let it get requeued
708 continue
showarde788ea62008-11-17 21:02:47 +0000709 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showardb18134f2009-03-20 20:52:18 +0000710 logging.info('Recovering %s (process %s)',
711 (', '.join(str(entry) for entry in queue_entries),
712 run_monitor.get_process()))
showard2bab8f42008-11-12 18:15:22 +0000713 self._recover_queue_entries(queue_entries, run_monitor)
showard170873e2009-01-07 00:22:26 +0000714 orphans.pop(execution_tag, None)
mbligh90a549d2008-03-25 23:52:34 +0000715
jadmanski0afbb632008-06-06 21:10:57 +0000716 # now kill any remaining autoserv processes
showard170873e2009-01-07 00:22:26 +0000717 for process in orphans.itervalues():
showardb18134f2009-03-20 20:52:18 +0000718 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000719 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000720
showard170873e2009-01-07 00:22:26 +0000721
722 def _recover_aborting_entries(self):
723 queue_entries = HostQueueEntry.fetch(
724 where='status IN ("Abort", "Aborting")')
jadmanski0afbb632008-06-06 21:10:57 +0000725 for queue_entry in queue_entries:
showardb18134f2009-03-20 20:52:18 +0000726 logging.info('Recovering aborting QE %s', queue_entry)
showard170873e2009-01-07 00:22:26 +0000727 agent = queue_entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000728
showard97aed502008-11-04 02:01:24 +0000729
showard170873e2009-01-07 00:22:26 +0000730 def _requeue_other_active_entries(self):
731 queue_entries = HostQueueEntry.fetch(
732 where='active AND NOT complete AND status != "Pending"')
733 for queue_entry in queue_entries:
734 if self.get_agents_for_entry(queue_entry):
735 # entry has already been recovered
736 continue
showardb18134f2009-03-20 20:52:18 +0000737 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
738 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000739 if queue_entry.host:
740 tasks = queue_entry.host.reverify_tasks()
741 self.add_agent(Agent(tasks))
742 agent = queue_entry.requeue()
743
744
745 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000746 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000747 self._reverify_hosts_where("""(status = 'Repairing' OR
748 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000749 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000750
showard170873e2009-01-07 00:22:26 +0000751 # recover "Running" hosts with no active queue entries, although this
752 # should never happen
753 message = ('Recovering running host %s - this probably indicates a '
754 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000755 self._reverify_hosts_where("""status = 'Running' AND
756 id NOT IN (SELECT host_id
757 FROM host_queue_entries
758 WHERE active)""",
759 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000760
761
jadmanski0afbb632008-06-06 21:10:57 +0000762 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000763 print_message='Reverifying host %s'):
764 full_where='locked = 0 AND invalid = 0 AND ' + where
765 for host in Host.fetch(where=full_where):
766 if self.host_has_agent(host):
767 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000768 continue
showard170873e2009-01-07 00:22:26 +0000769 if print_message:
showardb18134f2009-03-20 20:52:18 +0000770 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000771 tasks = host.reverify_tasks()
772 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000773
774
showard97aed502008-11-04 02:01:24 +0000775 def _recover_parsing_entries(self):
showard2bab8f42008-11-12 18:15:22 +0000776 recovered_entry_ids = set()
showard97aed502008-11-04 02:01:24 +0000777 for entry in HostQueueEntry.fetch(where='status = "Parsing"'):
showard2bab8f42008-11-12 18:15:22 +0000778 if entry.id in recovered_entry_ids:
779 continue
780 queue_entries = entry.job.get_group_entries(entry)
showard170873e2009-01-07 00:22:26 +0000781 recovered_entry_ids = recovered_entry_ids.union(
782 entry.id for entry in queue_entries)
showardb18134f2009-03-20 20:52:18 +0000783 logging.info('Recovering parsing entries %s',
784 (', '.join(str(entry) for entry in queue_entries)))
showard97aed502008-11-04 02:01:24 +0000785
786 reparse_task = FinalReparseTask(queue_entries)
showard170873e2009-01-07 00:22:26 +0000787 self.add_agent(Agent([reparse_task], num_processes=0))
showard97aed502008-11-04 02:01:24 +0000788
789
jadmanski0afbb632008-06-06 21:10:57 +0000790 def _recover_hosts(self):
791 # recover "Repair Failed" hosts
792 message = 'Reverifying dead host %s'
793 self._reverify_hosts_where("status = 'Repair Failed'",
794 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000795
796
showard3bb499f2008-07-03 19:42:20 +0000797 def _abort_timed_out_jobs(self):
798 """
799 Aborts all jobs that have timed out and not completed
800 """
showarda3ab0d52008-11-03 19:03:47 +0000801 query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
802 where=['created_on + INTERVAL timeout HOUR < NOW()'])
803 for job in query.distinct():
showardb18134f2009-03-20 20:52:18 +0000804 logging.warning('Aborting job %d due to job timeout', job.id)
showarda3ab0d52008-11-03 19:03:47 +0000805 job.abort(None)
showard3bb499f2008-07-03 19:42:20 +0000806
807
showard98863972008-10-29 21:14:56 +0000808 def _abort_jobs_past_synch_start_timeout(self):
809 """
810 Abort synchronous jobs that are past the start timeout (from global
811 config) and are holding a machine that's in everyone.
812 """
813 timeout_delta = datetime.timedelta(
showardd1ee1dd2009-01-07 21:33:08 +0000814 minutes=scheduler_config.config.synch_job_start_timeout_minutes)
showard98863972008-10-29 21:14:56 +0000815 timeout_start = datetime.datetime.now() - timeout_delta
816 query = models.Job.objects.filter(
showard98863972008-10-29 21:14:56 +0000817 created_on__lt=timeout_start,
818 hostqueueentry__status='Pending',
showardd9ac4452009-02-07 02:04:37 +0000819 hostqueueentry__host__aclgroup__name='Everyone')
showard98863972008-10-29 21:14:56 +0000820 for job in query.distinct():
showardb18134f2009-03-20 20:52:18 +0000821 logging.warning('Aborting job %d due to start timeout', job.id)
showardff059d72008-12-03 18:18:53 +0000822 entries_to_abort = job.hostqueueentry_set.exclude(
823 status=models.HostQueueEntry.Status.RUNNING)
824 for queue_entry in entries_to_abort:
825 queue_entry.abort(None)
showard98863972008-10-29 21:14:56 +0000826
827
jadmanski0afbb632008-06-06 21:10:57 +0000828 def _clear_inactive_blocks(self):
829 """
830 Clear out blocks for all completed jobs.
831 """
832 # this would be simpler using NOT IN (subquery), but MySQL
833 # treats all IN subqueries as dependent, so this optimizes much
834 # better
835 _db.execute("""
836 DELETE ihq FROM ineligible_host_queues ihq
showard4eaaf522008-06-06 22:28:07 +0000837 LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries
jadmanski0afbb632008-06-06 21:10:57 +0000838 WHERE NOT complete) hqe
839 USING (job_id) WHERE hqe.job_id IS NULL""")
showard04c82c52008-05-29 19:38:12 +0000840
841
showardb95b1bd2008-08-15 18:11:04 +0000842 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000843 # prioritize by job priority, then non-metahost over metahost, then FIFO
844 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000845 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000846 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000847 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000848
849
showard89f84db2009-03-12 20:39:13 +0000850 def _refresh_pending_queue_entries(self):
851 """
852 Lookup the pending HostQueueEntries and call our HostScheduler
853 refresh() method given that list. Return the list.
854
855 @returns A list of pending HostQueueEntries sorted in priority order.
856 """
showard63a34772008-08-18 19:32:50 +0000857 queue_entries = self._get_pending_queue_entries()
858 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000859 return []
showardb95b1bd2008-08-15 18:11:04 +0000860
showard63a34772008-08-18 19:32:50 +0000861 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000862
showard89f84db2009-03-12 20:39:13 +0000863 return queue_entries
864
865
866 def _schedule_atomic_group(self, queue_entry):
867 """
868 Schedule the given queue_entry on an atomic group of hosts.
869
870 Returns immediately if there are insufficient available hosts.
871
872 Creates new HostQueueEntries based off of queue_entry for the
873 scheduled hosts and starts them all running.
874 """
875 # This is a virtual host queue entry representing an entire
876 # atomic group, find a group and schedule their hosts.
877 group_hosts = self._host_scheduler.find_eligible_atomic_group(
878 queue_entry)
879 if not group_hosts:
880 return
881 # The first assigned host uses the original HostQueueEntry
882 group_queue_entries = [queue_entry]
883 for assigned_host in group_hosts[1:]:
884 # Create a new HQE for every additional assigned_host.
885 new_hqe = HostQueueEntry.clone(queue_entry)
886 new_hqe.save()
887 group_queue_entries.append(new_hqe)
888 assert len(group_queue_entries) == len(group_hosts)
889 for queue_entry, host in itertools.izip(group_queue_entries,
890 group_hosts):
891 self._run_queue_entry(queue_entry, host)
892
893
894 def _schedule_new_jobs(self):
895 queue_entries = self._refresh_pending_queue_entries()
896 if not queue_entries:
897 return
898
showard63a34772008-08-18 19:32:50 +0000899 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000900 if (queue_entry.atomic_group_id is None or
901 queue_entry.host_id is not None):
902 assigned_host = self._host_scheduler.find_eligible_host(
903 queue_entry)
904 if assigned_host:
905 self._run_queue_entry(queue_entry, assigned_host)
906 else:
907 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000908
909
910 def _run_queue_entry(self, queue_entry, host):
911 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000912 # in some cases (synchronous jobs with run_verify=False), agent may be
913 # None
showard9976ce92008-10-15 20:28:13 +0000914 if agent:
915 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000916
917
jadmanski0afbb632008-06-06 21:10:57 +0000918 def _find_aborting(self):
jadmanski0afbb632008-06-06 21:10:57 +0000919 for entry in queue_entries_to_abort():
showard170873e2009-01-07 00:22:26 +0000920 agents_to_abort = list(self.get_agents_for_entry(entry))
showard1be97432008-10-17 15:30:45 +0000921 for agent in agents_to_abort:
922 self.remove_agent(agent)
923
showard170873e2009-01-07 00:22:26 +0000924 entry.abort(self, agents_to_abort)
jadmanski0afbb632008-06-06 21:10:57 +0000925
926
showard324bf812009-01-20 23:23:38 +0000927 def _can_start_agent(self, agent, num_started_this_cycle,
928 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000929 # always allow zero-process agents to run
930 if agent.num_processes == 0:
931 return True
932 # don't allow any nonzero-process agents to run after we've reached a
933 # limit (this avoids starvation of many-process agents)
934 if have_reached_limit:
935 return False
936 # total process throttling
showard324bf812009-01-20 23:23:38 +0000937 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000938 return False
939 # if a single agent exceeds the per-cycle throttling, still allow it to
940 # run when it's the first agent in the cycle
941 if num_started_this_cycle == 0:
942 return True
943 # per-cycle throttling
944 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000945 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000946 return False
947 return True
948
949
jadmanski0afbb632008-06-06 21:10:57 +0000950 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000951 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000952 have_reached_limit = False
953 # iterate over copy, so we can remove agents during iteration
954 for agent in list(self._agents):
955 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000956 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000957 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000958 continue
959 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000960 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000961 have_reached_limit):
962 have_reached_limit = True
963 continue
showard4c5374f2008-09-04 17:02:56 +0000964 num_started_this_cycle += agent.num_processes
965 agent.tick()
showardb18134f2009-03-20 20:52:18 +0000966 logging.info('%d running processes',
967 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000968
969
showardfa8629c2008-11-04 16:51:23 +0000970 def _check_for_db_inconsistencies(self):
971 query = models.HostQueueEntry.objects.filter(active=True, complete=True)
972 if query.count() != 0:
973 subject = ('%d queue entries found with active=complete=1'
974 % query.count())
975 message = '\n'.join(str(entry.get_object_dict())
976 for entry in query[:50])
977 if len(query) > 50:
978 message += '\n(truncated)\n'
979
showardb18134f2009-03-20 20:52:18 +0000980 logging.error(subject)
showard170873e2009-01-07 00:22:26 +0000981 email_manager.manager.enqueue_notify_email(subject, message)
showardfa8629c2008-11-04 16:51:23 +0000982
983
showard170873e2009-01-07 00:22:26 +0000984class PidfileRunMonitor(object):
985 """
986 Client must call either run() to start a new process or
987 attach_to_existing_process().
988 """
mbligh36768f02008-02-22 18:28:33 +0000989
showard170873e2009-01-07 00:22:26 +0000990 class _PidfileException(Exception):
991 """
992 Raised when there's some unexpected behavior with the pid file, but only
993 used internally (never allowed to escape this class).
994 """
mbligh36768f02008-02-22 18:28:33 +0000995
996
showard170873e2009-01-07 00:22:26 +0000997 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000998 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000999 self._start_time = None
1000 self.pidfile_id = None
1001 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001002
1003
showard170873e2009-01-07 00:22:26 +00001004 def _add_nice_command(self, command, nice_level):
1005 if not nice_level:
1006 return command
1007 return ['nice', '-n', str(nice_level)] + command
1008
1009
1010 def _set_start_time(self):
1011 self._start_time = time.time()
1012
1013
1014 def run(self, command, working_directory, nice_level=None, log_file=None,
1015 pidfile_name=None, paired_with_pidfile=None):
1016 assert command is not None
1017 if nice_level is not None:
1018 command = ['nice', '-n', str(nice_level)] + command
1019 self._set_start_time()
1020 self.pidfile_id = _drone_manager.execute_command(
1021 command, working_directory, log_file=log_file,
1022 pidfile_name=pidfile_name, paired_with_pidfile=paired_with_pidfile)
1023
1024
1025 def attach_to_existing_process(self, execution_tag):
1026 self._set_start_time()
1027 self.pidfile_id = _drone_manager.get_pidfile_id_from(execution_tag)
1028 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001029
1030
jadmanski0afbb632008-06-06 21:10:57 +00001031 def kill(self):
showard170873e2009-01-07 00:22:26 +00001032 if self.has_process():
1033 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001034
mbligh36768f02008-02-22 18:28:33 +00001035
showard170873e2009-01-07 00:22:26 +00001036 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001037 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001038 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001039
1040
showard170873e2009-01-07 00:22:26 +00001041 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001042 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001043 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001044 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001045
1046
showard170873e2009-01-07 00:22:26 +00001047 def _read_pidfile(self, use_second_read=False):
1048 assert self.pidfile_id is not None, (
1049 'You must call run() or attach_to_existing_process()')
1050 contents = _drone_manager.get_pidfile_contents(
1051 self.pidfile_id, use_second_read=use_second_read)
1052 if contents.is_invalid():
1053 self._state = drone_manager.PidfileContents()
1054 raise self._PidfileException(contents)
1055 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001056
1057
showard21baa452008-10-21 00:08:39 +00001058 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001059 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1060 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001061 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001062 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001063 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001064
1065
1066 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001067 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001068 return
mblighbb421852008-03-11 22:36:16 +00001069
showard21baa452008-10-21 00:08:39 +00001070 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001071
showard170873e2009-01-07 00:22:26 +00001072 if self._state.process is None:
1073 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001074 return
mbligh90a549d2008-03-25 23:52:34 +00001075
showard21baa452008-10-21 00:08:39 +00001076 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001077 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001078 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001079 return
mbligh90a549d2008-03-25 23:52:34 +00001080
showard170873e2009-01-07 00:22:26 +00001081 # pid but no running process - maybe process *just* exited
1082 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001083 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001084 # autoserv exited without writing an exit code
1085 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001086 self._handle_pidfile_error(
1087 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001088
showard21baa452008-10-21 00:08:39 +00001089
1090 def _get_pidfile_info(self):
1091 """\
1092 After completion, self._state will contain:
1093 pid=None, exit_status=None if autoserv has not yet run
1094 pid!=None, exit_status=None if autoserv is running
1095 pid!=None, exit_status!=None if autoserv has completed
1096 """
1097 try:
1098 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001099 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001100 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001101
1102
showard170873e2009-01-07 00:22:26 +00001103 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001104 """\
1105 Called when no pidfile is found or no pid is in the pidfile.
1106 """
showard170873e2009-01-07 00:22:26 +00001107 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001108 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001109 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1110 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001111 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001112 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001113
1114
showard35162b02009-03-03 02:17:30 +00001115 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001116 """\
1117 Called when autoserv has exited without writing an exit status,
1118 or we've timed out waiting for autoserv to write a pid to the
1119 pidfile. In either case, we just return failure and the caller
1120 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001121
showard170873e2009-01-07 00:22:26 +00001122 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001123 """
1124 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001125 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001126 self._state.exit_status = 1
1127 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001128
1129
jadmanski0afbb632008-06-06 21:10:57 +00001130 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001131 self._get_pidfile_info()
1132 return self._state.exit_status
1133
1134
1135 def num_tests_failed(self):
1136 self._get_pidfile_info()
1137 assert self._state.num_tests_failed is not None
1138 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001139
1140
mbligh36768f02008-02-22 18:28:33 +00001141class Agent(object):
showard170873e2009-01-07 00:22:26 +00001142 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001143 self.active_task = None
1144 self.queue = Queue.Queue(0)
1145 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001146 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001147
showard170873e2009-01-07 00:22:26 +00001148 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1149 for task in tasks)
1150 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1151
jadmanski0afbb632008-06-06 21:10:57 +00001152 for task in tasks:
1153 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001154
1155
showard170873e2009-01-07 00:22:26 +00001156 def _union_ids(self, id_lists):
1157 return set(itertools.chain(*id_lists))
1158
1159
jadmanski0afbb632008-06-06 21:10:57 +00001160 def add_task(self, task):
1161 self.queue.put_nowait(task)
1162 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001163
1164
jadmanski0afbb632008-06-06 21:10:57 +00001165 def tick(self):
showard21baa452008-10-21 00:08:39 +00001166 while not self.is_done():
1167 if self.active_task and not self.active_task.is_done():
1168 self.active_task.poll()
1169 if not self.active_task.is_done():
1170 return
1171 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001172
1173
jadmanski0afbb632008-06-06 21:10:57 +00001174 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001175 logging.info("agent picking task")
jadmanski0afbb632008-06-06 21:10:57 +00001176 if self.active_task:
1177 assert self.active_task.is_done()
mbligh36768f02008-02-22 18:28:33 +00001178
jadmanski0afbb632008-06-06 21:10:57 +00001179 if not self.active_task.success:
1180 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001181
jadmanski0afbb632008-06-06 21:10:57 +00001182 self.active_task = None
1183 if not self.is_done():
1184 self.active_task = self.queue.get_nowait()
1185 if self.active_task:
1186 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001187
1188
jadmanski0afbb632008-06-06 21:10:57 +00001189 def on_task_failure(self):
1190 self.queue = Queue.Queue(0)
showardccbd6c52009-03-21 00:10:21 +00001191 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1192 # get reset.
1193 new_agent = Agent(self.active_task.failure_tasks)
1194 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001195
mblighe2586682008-02-29 22:45:46 +00001196
showard4c5374f2008-09-04 17:02:56 +00001197 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001198 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001199
1200
jadmanski0afbb632008-06-06 21:10:57 +00001201 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001202 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001203
1204
jadmanski0afbb632008-06-06 21:10:57 +00001205 def start(self):
1206 assert self.dispatcher
mbligh36768f02008-02-22 18:28:33 +00001207
jadmanski0afbb632008-06-06 21:10:57 +00001208 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001209
jadmanski0afbb632008-06-06 21:10:57 +00001210
mbligh36768f02008-02-22 18:28:33 +00001211class AgentTask(object):
showard170873e2009-01-07 00:22:26 +00001212 def __init__(self, cmd, working_directory=None, failure_tasks=[]):
jadmanski0afbb632008-06-06 21:10:57 +00001213 self.done = False
1214 self.failure_tasks = failure_tasks
1215 self.started = False
1216 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001217 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001218 self.task = None
1219 self.agent = None
1220 self.monitor = None
1221 self.success = None
showard170873e2009-01-07 00:22:26 +00001222 self.queue_entry_ids = []
1223 self.host_ids = []
1224 self.log_file = None
1225
1226
1227 def _set_ids(self, host=None, queue_entries=None):
1228 if queue_entries and queue_entries != [None]:
1229 self.host_ids = [entry.host.id for entry in queue_entries]
1230 self.queue_entry_ids = [entry.id for entry in queue_entries]
1231 else:
1232 assert host
1233 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001234
1235
jadmanski0afbb632008-06-06 21:10:57 +00001236 def poll(self):
jadmanski0afbb632008-06-06 21:10:57 +00001237 if self.monitor:
1238 self.tick(self.monitor.exit_code())
1239 else:
1240 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001241
1242
jadmanski0afbb632008-06-06 21:10:57 +00001243 def tick(self, exit_code):
showard170873e2009-01-07 00:22:26 +00001244 if exit_code is None:
jadmanski0afbb632008-06-06 21:10:57 +00001245 return
jadmanski0afbb632008-06-06 21:10:57 +00001246 if exit_code == 0:
1247 success = True
1248 else:
1249 success = False
mbligh36768f02008-02-22 18:28:33 +00001250
jadmanski0afbb632008-06-06 21:10:57 +00001251 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001252
1253
jadmanski0afbb632008-06-06 21:10:57 +00001254 def is_done(self):
1255 return self.done
mbligh36768f02008-02-22 18:28:33 +00001256
1257
jadmanski0afbb632008-06-06 21:10:57 +00001258 def finished(self, success):
1259 self.done = True
1260 self.success = success
1261 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001262
1263
jadmanski0afbb632008-06-06 21:10:57 +00001264 def prolog(self):
1265 pass
mblighd64e5702008-04-04 21:39:28 +00001266
1267
jadmanski0afbb632008-06-06 21:10:57 +00001268 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001269 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001270
mbligh36768f02008-02-22 18:28:33 +00001271
jadmanski0afbb632008-06-06 21:10:57 +00001272 def cleanup(self):
showard170873e2009-01-07 00:22:26 +00001273 if self.monitor and self.log_file:
1274 _drone_manager.copy_to_results_repository(
1275 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001276
1277
jadmanski0afbb632008-06-06 21:10:57 +00001278 def epilog(self):
1279 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001280
1281
jadmanski0afbb632008-06-06 21:10:57 +00001282 def start(self):
1283 assert self.agent
1284
1285 if not self.started:
1286 self.prolog()
1287 self.run()
1288
1289 self.started = True
1290
1291
1292 def abort(self):
1293 if self.monitor:
1294 self.monitor.kill()
1295 self.done = True
1296 self.cleanup()
1297
1298
showard170873e2009-01-07 00:22:26 +00001299 def set_host_log_file(self, base_name, host):
1300 filename = '%s.%s' % (time.time(), base_name)
1301 self.log_file = os.path.join('hosts', host.hostname, filename)
1302
1303
showardde634ee2009-01-30 01:44:24 +00001304 def _get_consistent_execution_tag(self, queue_entries):
1305 first_execution_tag = queue_entries[0].execution_tag()
1306 for queue_entry in queue_entries[1:]:
1307 assert queue_entry.execution_tag() == first_execution_tag, (
1308 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1309 queue_entry,
1310 first_execution_tag,
1311 queue_entries[0]))
1312 return first_execution_tag
1313
1314
showard678df4f2009-02-04 21:36:39 +00001315 def _copy_and_parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001316 assert len(queue_entries) > 0
1317 assert self.monitor
1318 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001319 results_path = execution_tag + '/'
1320 _drone_manager.copy_to_results_repository(self.monitor.get_process(),
1321 results_path)
showardde634ee2009-01-30 01:44:24 +00001322
1323 reparse_task = FinalReparseTask(queue_entries)
1324 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1325
1326
jadmanski0afbb632008-06-06 21:10:57 +00001327 def run(self):
1328 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001329 self.monitor = PidfileRunMonitor()
1330 self.monitor.run(self.cmd, self._working_directory,
1331 nice_level=AUTOSERV_NICE_LEVEL,
1332 log_file=self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001333
1334
1335class RepairTask(AgentTask):
showarde788ea62008-11-17 21:02:47 +00001336 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001337 """\
showard170873e2009-01-07 00:22:26 +00001338 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001339 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001340 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001341 # normalize the protection name
1342 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001343
jadmanski0afbb632008-06-06 21:10:57 +00001344 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001345 self.queue_entry_to_fail = queue_entry
1346 # *don't* include the queue entry in IDs -- if the queue entry is
1347 # aborted, we want to leave the repair task running
1348 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001349
1350 self.create_temp_resultsdir('.repair')
1351 cmd = [_autoserv_path , '-p', '-R', '-m', host.hostname,
1352 '-r', _drone_manager.absolute_path(self.temp_results_dir),
1353 '--host-protection', protection]
1354 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1355
showard170873e2009-01-07 00:22:26 +00001356 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001357
mbligh36768f02008-02-22 18:28:33 +00001358
jadmanski0afbb632008-06-06 21:10:57 +00001359 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001360 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001361 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001362 if self.queue_entry_to_fail:
1363 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001364
1365
showardde634ee2009-01-30 01:44:24 +00001366 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001367 assert self.queue_entry_to_fail
1368
1369 if self.queue_entry_to_fail.meta_host:
1370 return # don't fail metahost entries, they'll be reassigned
1371
1372 self.queue_entry_to_fail.update_from_database()
1373 if self.queue_entry_to_fail.status != 'Queued':
1374 return # entry has been aborted
1375
1376 self.queue_entry_to_fail.set_execution_subdir()
showard678df4f2009-02-04 21:36:39 +00001377 # copy results logs into the normal place for job results
1378 _drone_manager.copy_results_on_drone(
1379 self.monitor.get_process(),
1380 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001381 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001382
showardccbd6c52009-03-21 00:10:21 +00001383 self._copy_and_parse_results([self.queue_entry_to_fail])
1384 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001385
1386
jadmanski0afbb632008-06-06 21:10:57 +00001387 def epilog(self):
1388 super(RepairTask, self).epilog()
1389 if self.success:
1390 self.host.set_status('Ready')
1391 else:
1392 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001393 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001394 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001395
1396
showard8fe93b52008-11-18 17:53:22 +00001397class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001398 def epilog(self):
1399 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001400 should_copy_results = (self.queue_entry and not self.success
1401 and not self.queue_entry.meta_host)
1402 if should_copy_results:
1403 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001404 destination = os.path.join(self.queue_entry.execution_tag(),
1405 os.path.basename(self.log_file))
1406 _drone_manager.copy_to_results_repository(
1407 self.monitor.get_process(), self.log_file,
1408 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001409
1410
1411class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001412 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001413 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001414 self.host = host or queue_entry.host
1415 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001416
jadmanski0afbb632008-06-06 21:10:57 +00001417 self.create_temp_resultsdir('.verify')
showard170873e2009-01-07 00:22:26 +00001418 cmd = [_autoserv_path, '-p', '-v', '-m', self.host.hostname, '-r',
1419 _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001420 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001421 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1422 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001423
showard170873e2009-01-07 00:22:26 +00001424 self.set_host_log_file('verify', self.host)
1425 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001426
1427
jadmanski0afbb632008-06-06 21:10:57 +00001428 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001429 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001430 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001431 if self.queue_entry:
1432 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001433 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001434
1435
jadmanski0afbb632008-06-06 21:10:57 +00001436 def epilog(self):
1437 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001438
jadmanski0afbb632008-06-06 21:10:57 +00001439 if self.success:
1440 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001441
1442
mbligh36768f02008-02-22 18:28:33 +00001443class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001444 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001445 self.job = job
1446 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001447 super(QueueTask, self).__init__(cmd, self._execution_tag())
1448 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001449
1450
showard170873e2009-01-07 00:22:26 +00001451 def _format_keyval(self, key, value):
1452 return '%s=%s' % (key, value)
mbligh36768f02008-02-22 18:28:33 +00001453
1454
showard73ec0442009-02-07 02:05:20 +00001455 def _keyval_path(self):
1456 return os.path.join(self._execution_tag(), 'keyval')
1457
1458
1459 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1460 keyval_contents = '\n'.join(self._format_keyval(key, value)
1461 for key, value in keyval_dict.iteritems())
1462 # always end with a newline to allow additional keyvals to be written
1463 keyval_contents += '\n'
1464 _drone_manager.attach_file_to_execution(self._execution_tag(),
1465 keyval_contents,
1466 file_path=keyval_path)
1467
1468
1469 def _write_keyvals_before_job(self, keyval_dict):
1470 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1471
1472
1473 def _write_keyval_after_job(self, field, value):
showard170873e2009-01-07 00:22:26 +00001474 assert self.monitor and self.monitor.has_process()
showard170873e2009-01-07 00:22:26 +00001475 _drone_manager.write_lines_to_file(
showard73ec0442009-02-07 02:05:20 +00001476 self._keyval_path(), [self._format_keyval(field, value)],
showard35162b02009-03-03 02:17:30 +00001477 paired_with_process=self.monitor.get_process())
showardd8e548a2008-09-09 03:04:57 +00001478
1479
showard170873e2009-01-07 00:22:26 +00001480 def _write_host_keyvals(self, host):
1481 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1482 host.hostname)
1483 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001484 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1485 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001486
1487
showard170873e2009-01-07 00:22:26 +00001488 def _execution_tag(self):
1489 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001490
1491
jadmanski0afbb632008-06-06 21:10:57 +00001492 def prolog(self):
showard73ec0442009-02-07 02:05:20 +00001493 queued = int(time.mktime(self.job.created_on.timetuple()))
1494 self._write_keyvals_before_job({'job_queued': queued})
jadmanski0afbb632008-06-06 21:10:57 +00001495 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001496 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001497 queue_entry.set_status('Running')
1498 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001499 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001500 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001501 assert len(self.queue_entries) == 1
1502 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001503
1504
showard35162b02009-03-03 02:17:30 +00001505 def _write_lost_process_error_file(self):
1506 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1507 _drone_manager.write_lines_to_file(error_file_path,
1508 [_LOST_PROCESS_ERROR])
1509
1510
showard97aed502008-11-04 02:01:24 +00001511 def _finish_task(self, success):
showard35162b02009-03-03 02:17:30 +00001512 if self.monitor.has_process():
1513 self._write_keyval_after_job("job_finished", int(time.time()))
1514 self._copy_and_parse_results(self.queue_entries)
1515
1516 if self.monitor.lost_process:
1517 self._write_lost_process_error_file()
1518 for queue_entry in self.queue_entries:
1519 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001520
1521
showardcbd74612008-11-19 21:42:02 +00001522 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001523 _drone_manager.write_lines_to_file(
1524 os.path.join(self._execution_tag(), 'status.log'),
1525 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001526 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001527
1528
jadmanskif7fa2cc2008-10-01 14:13:23 +00001529 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001530 if not self.monitor or not self.monitor.has_process():
1531 return
1532
jadmanskif7fa2cc2008-10-01 14:13:23 +00001533 # build up sets of all the aborted_by and aborted_on values
1534 aborted_by, aborted_on = set(), set()
1535 for queue_entry in self.queue_entries:
1536 if queue_entry.aborted_by:
1537 aborted_by.add(queue_entry.aborted_by)
1538 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1539 aborted_on.add(t)
1540
1541 # extract some actual, unique aborted by value and write it out
1542 assert len(aborted_by) <= 1
1543 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001544 aborted_by_value = aborted_by.pop()
1545 aborted_on_value = max(aborted_on)
1546 else:
1547 aborted_by_value = 'autotest_system'
1548 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001549
showarda0382352009-02-11 23:36:43 +00001550 self._write_keyval_after_job("aborted_by", aborted_by_value)
1551 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001552
showardcbd74612008-11-19 21:42:02 +00001553 aborted_on_string = str(datetime.datetime.fromtimestamp(
1554 aborted_on_value))
1555 self._write_status_comment('Job aborted by %s on %s' %
1556 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001557
1558
jadmanski0afbb632008-06-06 21:10:57 +00001559 def abort(self):
1560 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001561 self._log_abort()
showard97aed502008-11-04 02:01:24 +00001562 self._finish_task(False)
jadmanskic2ac77f2008-05-16 21:44:04 +00001563
1564
showard21baa452008-10-21 00:08:39 +00001565 def _reboot_hosts(self):
1566 reboot_after = self.job.reboot_after
1567 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00001568 if reboot_after == models.RebootAfter.ALWAYS:
showard21baa452008-10-21 00:08:39 +00001569 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00001570 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
showard21baa452008-10-21 00:08:39 +00001571 num_tests_failed = self.monitor.num_tests_failed()
1572 do_reboot = (self.success and num_tests_failed == 0)
1573
showard8ebca792008-11-04 21:54:22 +00001574 for queue_entry in self.queue_entries:
1575 if do_reboot:
showard45ae8192008-11-05 19:32:53 +00001576 # don't pass the queue entry to the CleanupTask. if the cleanup
showardfa8629c2008-11-04 16:51:23 +00001577 # fails, the job doesn't care -- it's over.
showard45ae8192008-11-05 19:32:53 +00001578 cleanup_task = CleanupTask(host=queue_entry.get_host())
1579 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
showard8ebca792008-11-04 21:54:22 +00001580 else:
1581 queue_entry.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001582
1583
jadmanski0afbb632008-06-06 21:10:57 +00001584 def epilog(self):
1585 super(QueueTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001586 self._finish_task(self.success)
showard21baa452008-10-21 00:08:39 +00001587 self._reboot_hosts()
mblighbb421852008-03-11 22:36:16 +00001588
showardb18134f2009-03-20 20:52:18 +00001589 logging.info("queue_task finished with succes=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001590
1591
mblighbb421852008-03-11 22:36:16 +00001592class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001593 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001594 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001595 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001596
1597
jadmanski0afbb632008-06-06 21:10:57 +00001598 def run(self):
1599 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001600
1601
jadmanski0afbb632008-06-06 21:10:57 +00001602 def prolog(self):
1603 # recovering an existing process - don't do prolog
1604 pass
mblighbb421852008-03-11 22:36:16 +00001605
1606
showard8fe93b52008-11-18 17:53:22 +00001607class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001608 def __init__(self, host=None, queue_entry=None):
1609 assert bool(host) ^ bool(queue_entry)
1610 if queue_entry:
1611 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001612 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001613 self.host = host
showard170873e2009-01-07 00:22:26 +00001614
1615 self.create_temp_resultsdir('.cleanup')
1616 self.cmd = [_autoserv_path, '-p', '--cleanup', '-m', host.hostname,
1617 '-r', _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001618 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001619 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1620 failure_tasks=[repair_task])
1621
1622 self._set_ids(host=host, queue_entries=[queue_entry])
1623 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001624
mblighd5c95802008-03-05 00:33:46 +00001625
jadmanski0afbb632008-06-06 21:10:57 +00001626 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001627 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001628 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001629 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001630
mblighd5c95802008-03-05 00:33:46 +00001631
showard21baa452008-10-21 00:08:39 +00001632 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001633 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001634 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001635 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001636 self.host.update_field('dirty', 0)
1637
1638
mblighd5c95802008-03-05 00:33:46 +00001639class AbortTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001640 def __init__(self, queue_entry, agents_to_abort):
jadmanski0afbb632008-06-06 21:10:57 +00001641 super(AbortTask, self).__init__('')
showard170873e2009-01-07 00:22:26 +00001642 self.queue_entry = queue_entry
1643 # don't use _set_ids, since we don't want to set the host_ids
1644 self.queue_entry_ids = [queue_entry.id]
1645 self.agents_to_abort = agents_to_abort
mbligh36768f02008-02-22 18:28:33 +00001646
1647
jadmanski0afbb632008-06-06 21:10:57 +00001648 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001649 logging.info("starting abort on host %s, job %s",
1650 self.queue_entry.host_id, self.queue_entry.job_id)
mbligh36768f02008-02-22 18:28:33 +00001651
mblighd64e5702008-04-04 21:39:28 +00001652
jadmanski0afbb632008-06-06 21:10:57 +00001653 def epilog(self):
1654 super(AbortTask, self).epilog()
1655 self.queue_entry.set_status('Aborted')
1656 self.success = True
1657
1658
1659 def run(self):
1660 for agent in self.agents_to_abort:
1661 if (agent.active_task):
1662 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001663
1664
showard97aed502008-11-04 02:01:24 +00001665class FinalReparseTask(AgentTask):
showard97aed502008-11-04 02:01:24 +00001666 _num_running_parses = 0
1667
1668 def __init__(self, queue_entries):
1669 self._queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001670 # don't use _set_ids, since we don't want to set the host_ids
1671 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001672 self._parse_started = False
1673
1674 assert len(queue_entries) > 0
1675 queue_entry = queue_entries[0]
showard97aed502008-11-04 02:01:24 +00001676
showard170873e2009-01-07 00:22:26 +00001677 self._execution_tag = queue_entry.execution_tag()
1678 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1679 self._autoserv_monitor = PidfileRunMonitor()
1680 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1681 self._final_status = self._determine_final_status()
1682
showard97aed502008-11-04 02:01:24 +00001683 if _testing_mode:
1684 self.cmd = 'true'
showard170873e2009-01-07 00:22:26 +00001685 else:
1686 super(FinalReparseTask, self).__init__(
1687 cmd=self._generate_parse_command(),
1688 working_directory=self._execution_tag)
showard97aed502008-11-04 02:01:24 +00001689
showard170873e2009-01-07 00:22:26 +00001690 self.log_file = os.path.join(self._execution_tag, '.parse.log')
showard97aed502008-11-04 02:01:24 +00001691
1692
1693 @classmethod
1694 def _increment_running_parses(cls):
1695 cls._num_running_parses += 1
1696
1697
1698 @classmethod
1699 def _decrement_running_parses(cls):
1700 cls._num_running_parses -= 1
1701
1702
1703 @classmethod
1704 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001705 return (cls._num_running_parses <
1706 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001707
1708
showard170873e2009-01-07 00:22:26 +00001709 def _determine_final_status(self):
1710 # we'll use a PidfileRunMonitor to read the autoserv exit status
1711 if self._autoserv_monitor.exit_code() == 0:
1712 return models.HostQueueEntry.Status.COMPLETED
1713 return models.HostQueueEntry.Status.FAILED
1714
1715
showard97aed502008-11-04 02:01:24 +00001716 def prolog(self):
1717 super(FinalReparseTask, self).prolog()
1718 for queue_entry in self._queue_entries:
1719 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
1720
1721
1722 def epilog(self):
1723 super(FinalReparseTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001724 for queue_entry in self._queue_entries:
showard170873e2009-01-07 00:22:26 +00001725 queue_entry.set_status(self._final_status)
showard97aed502008-11-04 02:01:24 +00001726
1727
showard2bab8f42008-11-12 18:15:22 +00001728 def _generate_parse_command(self):
showard170873e2009-01-07 00:22:26 +00001729 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
1730 self._results_dir]
showard97aed502008-11-04 02:01:24 +00001731
1732
1733 def poll(self):
1734 # override poll to keep trying to start until the parse count goes down
1735 # and we can, at which point we revert to default behavior
1736 if self._parse_started:
1737 super(FinalReparseTask, self).poll()
1738 else:
1739 self._try_starting_parse()
1740
1741
1742 def run(self):
1743 # override run() to not actually run unless we can
1744 self._try_starting_parse()
1745
1746
1747 def _try_starting_parse(self):
1748 if not self._can_run_new_parse():
1749 return
showard170873e2009-01-07 00:22:26 +00001750
showard678df4f2009-02-04 21:36:39 +00001751 # make sure we actually have results to parse
showard35162b02009-03-03 02:17:30 +00001752 # this should never happen in normal operation
showard678df4f2009-02-04 21:36:39 +00001753 if not self._autoserv_monitor.has_process():
1754 email_manager.manager.enqueue_notify_email(
1755 'No results to parse',
1756 'No results to parse at %s' % self._autoserv_monitor.pidfile_id)
1757 self.finished(False)
1758 return
1759
showard97aed502008-11-04 02:01:24 +00001760 # actually run the parse command
showard170873e2009-01-07 00:22:26 +00001761 self.monitor = PidfileRunMonitor()
1762 self.monitor.run(self.cmd, self._working_directory,
1763 log_file=self.log_file,
1764 pidfile_name='.parser_execute',
1765 paired_with_pidfile=self._autoserv_monitor.pidfile_id)
1766
showard97aed502008-11-04 02:01:24 +00001767 self._increment_running_parses()
1768 self._parse_started = True
1769
1770
1771 def finished(self, success):
1772 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001773 if self._parse_started:
1774 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001775
1776
showardc9ae1782009-01-30 01:42:37 +00001777class SetEntryPendingTask(AgentTask):
1778 def __init__(self, queue_entry):
1779 super(SetEntryPendingTask, self).__init__(cmd='')
1780 self._queue_entry = queue_entry
1781 self._set_ids(queue_entries=[queue_entry])
1782
1783
1784 def run(self):
1785 agent = self._queue_entry.on_pending()
1786 if agent:
1787 self.agent.dispatcher.add_agent(agent)
1788 self.finished(True)
1789
1790
showarda3c58572009-03-12 20:36:59 +00001791class DBError(Exception):
1792 """Raised by the DBObject constructor when its select fails."""
1793
1794
mbligh36768f02008-02-22 18:28:33 +00001795class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001796 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001797
1798 # Subclasses MUST override these:
1799 _table_name = ''
1800 _fields = ()
1801
showarda3c58572009-03-12 20:36:59 +00001802 # A mapping from (type, id) to the instance of the object for that
1803 # particular id. This prevents us from creating new Job() and Host()
1804 # instances for every HostQueueEntry object that we instantiate as
1805 # multiple HQEs often share the same Job.
1806 _instances_by_type_and_id = weakref.WeakValueDictionary()
1807 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001808
showarda3c58572009-03-12 20:36:59 +00001809
1810 def __new__(cls, id=None, **kwargs):
1811 """
1812 Look to see if we already have an instance for this particular type
1813 and id. If so, use it instead of creating a duplicate instance.
1814 """
1815 if id is not None:
1816 instance = cls._instances_by_type_and_id.get((cls, id))
1817 if instance:
1818 return instance
1819 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1820
1821
1822 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001823 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001824 assert self._table_name, '_table_name must be defined in your class'
1825 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00001826 if not new_record:
1827 if self._initialized and not always_query:
1828 return # We've already been initialized.
1829 if id is None:
1830 id = row[0]
1831 # Tell future constructors to use us instead of re-querying while
1832 # this instance is still around.
1833 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00001834
showard6ae5ea92009-02-25 00:11:51 +00001835 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00001836
jadmanski0afbb632008-06-06 21:10:57 +00001837 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001838
jadmanski0afbb632008-06-06 21:10:57 +00001839 if row is None:
showardccbd6c52009-03-21 00:10:21 +00001840 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00001841
showarda3c58572009-03-12 20:36:59 +00001842 if self._initialized:
1843 differences = self._compare_fields_in_row(row)
1844 if differences:
showard7629f142009-03-27 21:02:02 +00001845 logging.warn(
1846 'initialized %s %s instance requery is updating: %s',
1847 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00001848 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00001849 self._initialized = True
1850
1851
1852 @classmethod
1853 def _clear_instance_cache(cls):
1854 """Used for testing, clear the internal instance cache."""
1855 cls._instances_by_type_and_id.clear()
1856
1857
showardccbd6c52009-03-21 00:10:21 +00001858 def _fetch_row_from_db(self, row_id):
1859 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1860 rows = _db.execute(sql, (row_id,))
1861 if not rows:
1862 raise DBError("row not found (table=%s, id=%s)"
1863 % (self.__table, id))
1864 return rows[0]
1865
1866
showarda3c58572009-03-12 20:36:59 +00001867 def _assert_row_length(self, row):
1868 assert len(row) == len(self._fields), (
1869 "table = %s, row = %s/%d, fields = %s/%d" % (
1870 self.__table, row, len(row), self._fields, len(self._fields)))
1871
1872
1873 def _compare_fields_in_row(self, row):
1874 """
1875 Given a row as returned by a SELECT query, compare it to our existing
1876 in memory fields.
1877
1878 @param row - A sequence of values corresponding to fields named in
1879 The class attribute _fields.
1880
1881 @returns A dictionary listing the differences keyed by field name
1882 containing tuples of (current_value, row_value).
1883 """
1884 self._assert_row_length(row)
1885 differences = {}
1886 for field, row_value in itertools.izip(self._fields, row):
1887 current_value = getattr(self, field)
1888 if current_value != row_value:
1889 differences[field] = (current_value, row_value)
1890 return differences
showard2bab8f42008-11-12 18:15:22 +00001891
1892
1893 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00001894 """
1895 Update our field attributes using a single row returned by SELECT.
1896
1897 @param row - A sequence of values corresponding to fields named in
1898 the class fields list.
1899 """
1900 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00001901
showard2bab8f42008-11-12 18:15:22 +00001902 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00001903 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00001904 setattr(self, field, value)
1905 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00001906
showard2bab8f42008-11-12 18:15:22 +00001907 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00001908
mblighe2586682008-02-29 22:45:46 +00001909
showardccbd6c52009-03-21 00:10:21 +00001910 def update_from_database(self):
1911 assert self.id is not None
1912 row = self._fetch_row_from_db(self.id)
1913 self._update_fields_from_row(row)
1914
1915
jadmanski0afbb632008-06-06 21:10:57 +00001916 def count(self, where, table = None):
1917 if not table:
1918 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001919
jadmanski0afbb632008-06-06 21:10:57 +00001920 rows = _db.execute("""
1921 SELECT count(*) FROM %s
1922 WHERE %s
1923 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001924
jadmanski0afbb632008-06-06 21:10:57 +00001925 assert len(rows) == 1
1926
1927 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001928
1929
mblighf8c624d2008-07-03 16:58:45 +00001930 def update_field(self, field, value, condition=''):
showard2bab8f42008-11-12 18:15:22 +00001931 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00001932
showard2bab8f42008-11-12 18:15:22 +00001933 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00001934 return
mbligh36768f02008-02-22 18:28:33 +00001935
mblighf8c624d2008-07-03 16:58:45 +00001936 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
1937 if condition:
1938 query += ' AND (%s)' % condition
jadmanski0afbb632008-06-06 21:10:57 +00001939 _db.execute(query, (value, self.id))
1940
showard2bab8f42008-11-12 18:15:22 +00001941 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00001942
1943
jadmanski0afbb632008-06-06 21:10:57 +00001944 def save(self):
1945 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00001946 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00001947 columns = ','.join([str(key) for key in keys])
1948 values = ['"%s"' % self.__dict__[key] for key in keys]
showard89f84db2009-03-12 20:39:13 +00001949 values_str = ','.join(values)
1950 query = ('INSERT INTO %s (%s) VALUES (%s)' %
1951 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00001952 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00001953 # Update our id to the one the database just assigned to us.
1954 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00001955
1956
jadmanski0afbb632008-06-06 21:10:57 +00001957 def delete(self):
showarda3c58572009-03-12 20:36:59 +00001958 self._instances_by_type_and_id.pop((type(self), id), None)
1959 self._initialized = False
1960 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00001961 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1962 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00001963
1964
showard63a34772008-08-18 19:32:50 +00001965 @staticmethod
1966 def _prefix_with(string, prefix):
1967 if string:
1968 string = prefix + string
1969 return string
1970
1971
jadmanski0afbb632008-06-06 21:10:57 +00001972 @classmethod
showard989f25d2008-10-01 11:38:11 +00001973 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00001974 """
1975 Construct instances of our class based on the given database query.
1976
1977 @yields One class instance for each row fetched.
1978 """
showard63a34772008-08-18 19:32:50 +00001979 order_by = cls._prefix_with(order_by, 'ORDER BY ')
1980 where = cls._prefix_with(where, 'WHERE ')
1981 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00001982 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00001983 'joins' : joins,
1984 'where' : where,
1985 'order_by' : order_by})
1986 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00001987 for row in rows:
1988 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00001989
mbligh36768f02008-02-22 18:28:33 +00001990
1991class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001992 _table_name = 'ineligible_host_queues'
1993 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00001994
1995
showard89f84db2009-03-12 20:39:13 +00001996class AtomicGroup(DBObject):
1997 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00001998 _fields = ('id', 'name', 'description', 'max_number_of_machines',
1999 'invalid')
showard89f84db2009-03-12 20:39:13 +00002000
2001
showard989f25d2008-10-01 11:38:11 +00002002class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002003 _table_name = 'labels'
2004 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002005 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002006
2007
mbligh36768f02008-02-22 18:28:33 +00002008class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002009 _table_name = 'hosts'
2010 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2011 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2012
2013
jadmanski0afbb632008-06-06 21:10:57 +00002014 def current_task(self):
2015 rows = _db.execute("""
2016 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2017 """, (self.id,))
2018
2019 if len(rows) == 0:
2020 return None
2021 else:
2022 assert len(rows) == 1
2023 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002024 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002025
2026
jadmanski0afbb632008-06-06 21:10:57 +00002027 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002028 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002029 if self.current_task():
2030 self.current_task().requeue()
2031
showard6ae5ea92009-02-25 00:11:51 +00002032
jadmanski0afbb632008-06-06 21:10:57 +00002033 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002034 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002035 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002036
2037
showard170873e2009-01-07 00:22:26 +00002038 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002039 """
showard170873e2009-01-07 00:22:26 +00002040 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002041 """
2042 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002043 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002044 FROM labels
2045 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002046 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002047 ORDER BY labels.name
2048 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002049 platform = None
2050 all_labels = []
2051 for label_name, is_platform in rows:
2052 if is_platform:
2053 platform = label_name
2054 all_labels.append(label_name)
2055 return platform, all_labels
2056
2057
2058 def reverify_tasks(self):
2059 cleanup_task = CleanupTask(host=self)
2060 verify_task = VerifyTask(host=self)
2061 # just to make sure this host does not get taken away
2062 self.set_status('Cleaning')
2063 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002064
2065
mbligh36768f02008-02-22 18:28:33 +00002066class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002067 _table_name = 'host_queue_entries'
2068 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002069 'active', 'complete', 'deleted', 'execution_subdir',
2070 'atomic_group_id')
showard6ae5ea92009-02-25 00:11:51 +00002071
2072
showarda3c58572009-03-12 20:36:59 +00002073 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002074 assert id or row
showarda3c58572009-03-12 20:36:59 +00002075 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002076 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002077
jadmanski0afbb632008-06-06 21:10:57 +00002078 if self.host_id:
2079 self.host = Host(self.host_id)
2080 else:
2081 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002082
showard170873e2009-01-07 00:22:26 +00002083 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002084 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002085
2086
showard89f84db2009-03-12 20:39:13 +00002087 @classmethod
2088 def clone(cls, template):
2089 """
2090 Creates a new row using the values from a template instance.
2091
2092 The new instance will not exist in the database or have a valid
2093 id attribute until its save() method is called.
2094 """
2095 assert isinstance(template, cls)
2096 new_row = [getattr(template, field) for field in cls._fields]
2097 clone = cls(row=new_row, new_record=True)
2098 clone.id = None
2099 return clone
2100
2101
showardc85c21b2008-11-24 22:17:37 +00002102 def _view_job_url(self):
2103 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2104
2105
jadmanski0afbb632008-06-06 21:10:57 +00002106 def set_host(self, host):
2107 if host:
2108 self.queue_log_record('Assigning host ' + host.hostname)
2109 self.update_field('host_id', host.id)
2110 self.update_field('active', True)
2111 self.block_host(host.id)
2112 else:
2113 self.queue_log_record('Releasing host')
2114 self.unblock_host(self.host.id)
2115 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002116
jadmanski0afbb632008-06-06 21:10:57 +00002117 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002118
2119
jadmanski0afbb632008-06-06 21:10:57 +00002120 def get_host(self):
2121 return self.host
mbligh36768f02008-02-22 18:28:33 +00002122
2123
jadmanski0afbb632008-06-06 21:10:57 +00002124 def queue_log_record(self, log_line):
2125 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002126 _drone_manager.write_lines_to_file(self.queue_log_path,
2127 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002128
2129
jadmanski0afbb632008-06-06 21:10:57 +00002130 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002131 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002132 row = [0, self.job.id, host_id]
2133 block = IneligibleHostQueue(row=row, new_record=True)
2134 block.save()
mblighe2586682008-02-29 22:45:46 +00002135
2136
jadmanski0afbb632008-06-06 21:10:57 +00002137 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002138 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002139 blocks = IneligibleHostQueue.fetch(
2140 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2141 for block in blocks:
2142 block.delete()
mblighe2586682008-02-29 22:45:46 +00002143
2144
showard2bab8f42008-11-12 18:15:22 +00002145 def set_execution_subdir(self, subdir=None):
2146 if subdir is None:
2147 assert self.get_host()
2148 subdir = self.get_host().hostname
2149 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002150
2151
showard6355f6b2008-12-05 18:52:13 +00002152 def _get_hostname(self):
2153 if self.host:
2154 return self.host.hostname
2155 return 'no host'
2156
2157
showard170873e2009-01-07 00:22:26 +00002158 def __str__(self):
2159 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2160
2161
jadmanski0afbb632008-06-06 21:10:57 +00002162 def set_status(self, status):
mblighf8c624d2008-07-03 16:58:45 +00002163 abort_statuses = ['Abort', 'Aborting', 'Aborted']
2164 if status not in abort_statuses:
2165 condition = ' AND '.join(['status <> "%s"' % x
2166 for x in abort_statuses])
2167 else:
2168 condition = ''
2169 self.update_field('status', status, condition=condition)
2170
showardb18134f2009-03-20 20:52:18 +00002171 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002172
showardc85c21b2008-11-24 22:17:37 +00002173 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002174 self.update_field('complete', False)
2175 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002176
jadmanski0afbb632008-06-06 21:10:57 +00002177 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showarde58e3f82008-11-20 19:04:59 +00002178 'Aborting']:
jadmanski0afbb632008-06-06 21:10:57 +00002179 self.update_field('complete', False)
2180 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002181
showardc85c21b2008-11-24 22:17:37 +00002182 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002183 self.update_field('complete', True)
2184 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002185
2186 should_email_status = (status.lower() in _notify_email_statuses or
2187 'all' in _notify_email_statuses)
2188 if should_email_status:
2189 self._email_on_status(status)
2190
2191 self._email_on_job_complete()
2192
2193
2194 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002195 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002196
2197 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2198 self.job.id, self.job.name, hostname, status)
2199 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2200 self.job.id, self.job.name, hostname, status,
2201 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002202 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002203
2204
2205 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002206 if not self.job.is_finished():
2207 return
showard542e8402008-09-19 20:16:18 +00002208
showardc85c21b2008-11-24 22:17:37 +00002209 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002210 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002211 for queue_entry in hosts_queue:
2212 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002213 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002214 queue_entry.status))
2215
2216 summary_text = "\n".join(summary_text)
2217 status_counts = models.Job.objects.get_status_counts(
2218 [self.job.id])[self.job.id]
2219 status = ', '.join('%d %s' % (count, status) for status, count
2220 in status_counts.iteritems())
2221
2222 subject = 'Autotest: Job ID: %s "%s" %s' % (
2223 self.job.id, self.job.name, status)
2224 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2225 self.job.id, self.job.name, status, self._view_job_url(),
2226 summary_text)
showard170873e2009-01-07 00:22:26 +00002227 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002228
2229
showard89f84db2009-03-12 20:39:13 +00002230 def run(self, assigned_host=None):
2231 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002232 assert assigned_host
2233 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002234 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002235
showardb18134f2009-03-20 20:52:18 +00002236 logging.info("%s/%s/%s scheduled on %s, status=%s",
2237 self.job.name, self.meta_host, self.atomic_group_id,
2238 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002239
jadmanski0afbb632008-06-06 21:10:57 +00002240 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002241
showard6ae5ea92009-02-25 00:11:51 +00002242
jadmanski0afbb632008-06-06 21:10:57 +00002243 def requeue(self):
2244 self.set_status('Queued')
showardde634ee2009-01-30 01:44:24 +00002245 # verify/cleanup failure sets the execution subdir, so reset it here
2246 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002247 if self.meta_host:
2248 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002249
2250
jadmanski0afbb632008-06-06 21:10:57 +00002251 def handle_host_failure(self):
2252 """\
2253 Called when this queue entry's host has failed verification and
2254 repair.
2255 """
2256 assert not self.meta_host
2257 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002258 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002259
2260
jadmanskif7fa2cc2008-10-01 14:13:23 +00002261 @property
2262 def aborted_by(self):
2263 self._load_abort_info()
2264 return self._aborted_by
2265
2266
2267 @property
2268 def aborted_on(self):
2269 self._load_abort_info()
2270 return self._aborted_on
2271
2272
2273 def _load_abort_info(self):
2274 """ Fetch info about who aborted the job. """
2275 if hasattr(self, "_aborted_by"):
2276 return
2277 rows = _db.execute("""
2278 SELECT users.login, aborted_host_queue_entries.aborted_on
2279 FROM aborted_host_queue_entries
2280 INNER JOIN users
2281 ON users.id = aborted_host_queue_entries.aborted_by_id
2282 WHERE aborted_host_queue_entries.queue_entry_id = %s
2283 """, (self.id,))
2284 if rows:
2285 self._aborted_by, self._aborted_on = rows[0]
2286 else:
2287 self._aborted_by = self._aborted_on = None
2288
2289
showardb2e2c322008-10-14 17:33:55 +00002290 def on_pending(self):
2291 """
2292 Called when an entry in a synchronous job has passed verify. If the
2293 job is ready to run, returns an agent to run the job. Returns None
2294 otherwise.
2295 """
2296 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002297 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002298 if self.job.is_ready():
2299 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002300 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002301 return None
2302
2303
showard170873e2009-01-07 00:22:26 +00002304 def abort(self, dispatcher, agents_to_abort=[]):
showard1be97432008-10-17 15:30:45 +00002305 host = self.get_host()
showard9d9ffd52008-11-09 23:14:35 +00002306 if self.active and host:
showard170873e2009-01-07 00:22:26 +00002307 dispatcher.add_agent(Agent(tasks=host.reverify_tasks()))
showard1be97432008-10-17 15:30:45 +00002308
showard170873e2009-01-07 00:22:26 +00002309 abort_task = AbortTask(self, agents_to_abort)
showard1be97432008-10-17 15:30:45 +00002310 self.set_status('Aborting')
showard170873e2009-01-07 00:22:26 +00002311 dispatcher.add_agent(Agent(tasks=[abort_task], num_processes=0))
2312
2313 def execution_tag(self):
2314 assert self.execution_subdir
2315 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002316
2317
mbligh36768f02008-02-22 18:28:33 +00002318class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002319 _table_name = 'jobs'
2320 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2321 'control_type', 'created_on', 'synch_count', 'timeout',
2322 'run_verify', 'email_list', 'reboot_before', 'reboot_after')
2323
2324
showarda3c58572009-03-12 20:36:59 +00002325 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002326 assert id or row
showarda3c58572009-03-12 20:36:59 +00002327 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002328
mblighe2586682008-02-29 22:45:46 +00002329
jadmanski0afbb632008-06-06 21:10:57 +00002330 def is_server_job(self):
2331 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002332
2333
showard170873e2009-01-07 00:22:26 +00002334 def tag(self):
2335 return "%s-%s" % (self.id, self.owner)
2336
2337
jadmanski0afbb632008-06-06 21:10:57 +00002338 def get_host_queue_entries(self):
2339 rows = _db.execute("""
2340 SELECT * FROM host_queue_entries
2341 WHERE job_id= %s
2342 """, (self.id,))
2343 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002344
jadmanski0afbb632008-06-06 21:10:57 +00002345 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002346
jadmanski0afbb632008-06-06 21:10:57 +00002347 return entries
mbligh36768f02008-02-22 18:28:33 +00002348
2349
jadmanski0afbb632008-06-06 21:10:57 +00002350 def set_status(self, status, update_queues=False):
2351 self.update_field('status',status)
2352
2353 if update_queues:
2354 for queue_entry in self.get_host_queue_entries():
2355 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002356
2357
jadmanski0afbb632008-06-06 21:10:57 +00002358 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002359 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2360 status='Pending')
2361 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002362
2363
jadmanski0afbb632008-06-06 21:10:57 +00002364 def num_machines(self, clause = None):
2365 sql = "job_id=%s" % self.id
2366 if clause:
2367 sql += " AND (%s)" % clause
2368 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002369
2370
jadmanski0afbb632008-06-06 21:10:57 +00002371 def num_queued(self):
2372 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002373
2374
jadmanski0afbb632008-06-06 21:10:57 +00002375 def num_active(self):
2376 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002377
2378
jadmanski0afbb632008-06-06 21:10:57 +00002379 def num_complete(self):
2380 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002381
2382
jadmanski0afbb632008-06-06 21:10:57 +00002383 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002384 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002385
mbligh36768f02008-02-22 18:28:33 +00002386
showard6bb7c292009-01-30 01:44:51 +00002387 def _not_yet_run_entries(self, include_verifying=True):
2388 statuses = [models.HostQueueEntry.Status.QUEUED,
2389 models.HostQueueEntry.Status.PENDING]
2390 if include_verifying:
2391 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2392 return models.HostQueueEntry.objects.filter(job=self.id,
2393 status__in=statuses)
2394
2395
2396 def _stop_all_entries(self):
2397 entries_to_stop = self._not_yet_run_entries(
2398 include_verifying=False)
2399 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002400 assert not child_entry.complete, (
2401 '%s status=%s, active=%s, complete=%s' %
2402 (child_entry.id, child_entry.status, child_entry.active,
2403 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002404 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2405 child_entry.host.status = models.Host.Status.READY
2406 child_entry.host.save()
2407 child_entry.status = models.HostQueueEntry.Status.STOPPED
2408 child_entry.save()
2409
showard2bab8f42008-11-12 18:15:22 +00002410 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002411 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002412 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002413 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002414
2415
jadmanski0afbb632008-06-06 21:10:57 +00002416 def write_to_machines_file(self, queue_entry):
2417 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002418 file_path = os.path.join(self.tag(), '.machines')
2419 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002420
2421
showard2bab8f42008-11-12 18:15:22 +00002422 def _next_group_name(self):
2423 query = models.HostQueueEntry.objects.filter(
2424 job=self.id).values('execution_subdir').distinct()
2425 subdirs = (entry['execution_subdir'] for entry in query)
2426 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2427 ids = [int(match.group(1)) for match in groups if match]
2428 if ids:
2429 next_id = max(ids) + 1
2430 else:
2431 next_id = 0
2432 return "group%d" % next_id
2433
2434
showard170873e2009-01-07 00:22:26 +00002435 def _write_control_file(self, execution_tag):
2436 control_path = _drone_manager.attach_file_to_execution(
2437 execution_tag, self.control_file)
2438 return control_path
mbligh36768f02008-02-22 18:28:33 +00002439
showardb2e2c322008-10-14 17:33:55 +00002440
showard2bab8f42008-11-12 18:15:22 +00002441 def get_group_entries(self, queue_entry_from_group):
2442 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002443 return list(HostQueueEntry.fetch(
2444 where='job_id=%s AND execution_subdir=%s',
2445 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002446
2447
showardb2e2c322008-10-14 17:33:55 +00002448 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002449 assert queue_entries
2450 execution_tag = queue_entries[0].execution_tag()
2451 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002452 hostnames = ','.join([entry.get_host().hostname
2453 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002454
showard170873e2009-01-07 00:22:26 +00002455 params = [_autoserv_path, '-P', execution_tag, '-p', '-n',
2456 '-r', _drone_manager.absolute_path(execution_tag),
2457 '-u', self.owner, '-l', self.name, '-m', hostnames,
2458 _drone_manager.absolute_path(control_path)]
mbligh36768f02008-02-22 18:28:33 +00002459
jadmanski0afbb632008-06-06 21:10:57 +00002460 if not self.is_server_job():
2461 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002462
showardb2e2c322008-10-14 17:33:55 +00002463 return params
mblighe2586682008-02-29 22:45:46 +00002464
mbligh36768f02008-02-22 18:28:33 +00002465
showardc9ae1782009-01-30 01:42:37 +00002466 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002467 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002468 return True
showard0fc38302008-10-23 00:44:07 +00002469 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002470 return queue_entry.get_host().dirty
2471 return False
showard21baa452008-10-21 00:08:39 +00002472
showardc9ae1782009-01-30 01:42:37 +00002473
2474 def _should_run_verify(self, queue_entry):
2475 do_not_verify = (queue_entry.host.protection ==
2476 host_protections.Protection.DO_NOT_VERIFY)
2477 if do_not_verify:
2478 return False
2479 return self.run_verify
2480
2481
2482 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002483 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002484 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002485 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002486 if self._should_run_verify(queue_entry):
2487 tasks.append(VerifyTask(queue_entry=queue_entry))
2488 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002489 return tasks
2490
2491
showard2bab8f42008-11-12 18:15:22 +00002492 def _assign_new_group(self, queue_entries):
2493 if len(queue_entries) == 1:
2494 group_name = queue_entries[0].get_host().hostname
2495 else:
2496 group_name = self._next_group_name()
showardb18134f2009-03-20 20:52:18 +00002497 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002498 self.id, [entry.host.hostname for entry in queue_entries],
2499 group_name)
2500
2501 for queue_entry in queue_entries:
2502 queue_entry.set_execution_subdir(group_name)
2503
2504
2505 def _choose_group_to_run(self, include_queue_entry):
2506 chosen_entries = [include_queue_entry]
2507
2508 num_entries_needed = self.synch_count - 1
2509 if num_entries_needed > 0:
2510 pending_entries = HostQueueEntry.fetch(
2511 where='job_id = %s AND status = "Pending" AND id != %s',
2512 params=(self.id, include_queue_entry.id))
2513 chosen_entries += list(pending_entries)[:num_entries_needed]
2514
2515 self._assign_new_group(chosen_entries)
2516 return chosen_entries
2517
2518
2519 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002520 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002521 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2522 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002523
showard2bab8f42008-11-12 18:15:22 +00002524 queue_entries = self._choose_group_to_run(queue_entry)
2525 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002526
2527
2528 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002529 for queue_entry in queue_entries:
2530 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002531 params = self._get_autoserv_params(queue_entries)
2532 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2533 cmd=params)
2534 tasks = initial_tasks + [queue_task]
2535 entry_ids = [entry.id for entry in queue_entries]
2536
showard170873e2009-01-07 00:22:26 +00002537 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002538
2539
mbligh36768f02008-02-22 18:28:33 +00002540if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002541 main()