blob: 3e9e95d26b30268c857255895d82bb1949963da2 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard89f84db2009-03-12 20:39:13 +00008import datetime, errno, optparse, os, pwd, Queue, random, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000017from autotest_lib.frontend.afe import models
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
showardd1ee1dd2009-01-07 21:33:08 +000019from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000020
mblighb090f142008-02-27 21:33:46 +000021
mbligh36768f02008-02-22 18:28:33 +000022RESULTS_DIR = '.'
23AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000024DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000025
26AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
27
28if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000029 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000030AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
31AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
32
33if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000034 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000035
mbligh90a549d2008-03-25 23:52:34 +000036# how long to wait for autoserv to write a pidfile
37PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000038
showard35162b02009-03-03 02:17:30 +000039# error message to leave in results dir when an autoserv process disappears
40# mysteriously
41_LOST_PROCESS_ERROR = """\
42Autoserv failed abnormally during execution for this job, probably due to a
43system error on the Autotest server. Full results may not be available. Sorry.
44"""
45
mbligh6f8bab42008-02-29 22:45:14 +000046_db = None
mbligh36768f02008-02-22 18:28:33 +000047_shutdown = False
showard170873e2009-01-07 00:22:26 +000048_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
49_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000050_testing_mode = False
showard542e8402008-09-19 20:16:18 +000051_base_url = None
showardc85c21b2008-11-24 22:17:37 +000052_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000053_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000054
showardb18134f2009-03-20 20:52:18 +000055# load the logging settings
56scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
57os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
58# Here we export the log name, using the same convention as autoserv's results
59# directory.
mblighc9895aa2009-04-01 18:36:58 +000060if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
61 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
62else:
63 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
64 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
65
showardb18134f2009-03-20 20:52:18 +000066logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
67
mbligh36768f02008-02-22 18:28:33 +000068
69def main():
jadmanski0afbb632008-06-06 21:10:57 +000070 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000071
jadmanski0afbb632008-06-06 21:10:57 +000072 parser = optparse.OptionParser(usage)
73 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
74 action='store_true')
75 parser.add_option('--logfile', help='Set a log file that all stdout ' +
76 'should be redirected to. Stderr will go to this ' +
77 'file + ".err"')
78 parser.add_option('--test', help='Indicate that scheduler is under ' +
79 'test and should use dummy autoserv and no parsing',
80 action='store_true')
81 (options, args) = parser.parse_args()
82 if len(args) != 1:
83 parser.print_usage()
84 return
mbligh36768f02008-02-22 18:28:33 +000085
jadmanski0afbb632008-06-06 21:10:57 +000086 global RESULTS_DIR
87 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +000088
showardcca334f2009-03-12 20:38:34 +000089 # Change the cwd while running to avoid issues incase we were launched from
90 # somewhere odd (such as a random NFS home directory of the person running
91 # sudo to launch us as the appropriate user).
92 os.chdir(RESULTS_DIR)
93
jadmanski0afbb632008-06-06 21:10:57 +000094 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +000095 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
96 "notify_email_statuses",
97 default='')
showardc85c21b2008-11-24 22:17:37 +000098 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +000099 _notify_email_statuses = [status for status in
100 re.split(r'[\s,;:]', notify_statuses_list.lower())
101 if status]
showardc85c21b2008-11-24 22:17:37 +0000102
jadmanski0afbb632008-06-06 21:10:57 +0000103 if options.test:
104 global _autoserv_path
105 _autoserv_path = 'autoserv_dummy'
106 global _testing_mode
107 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000108
mbligh37eceaa2008-12-15 22:56:37 +0000109 # AUTOTEST_WEB.base_url is still a supported config option as some people
110 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000111 global _base_url
showard170873e2009-01-07 00:22:26 +0000112 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
113 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000114 if config_base_url:
115 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000116 else:
mbligh37eceaa2008-12-15 22:56:37 +0000117 # For the common case of everything running on a single server you
118 # can just set the hostname in a single place in the config file.
119 server_name = c.get_config_value('SERVER', 'hostname')
120 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000121 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000122 sys.exit(1)
123 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000124
showardc5afc462009-01-13 00:09:39 +0000125 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000126 server.start()
127
jadmanski0afbb632008-06-06 21:10:57 +0000128 try:
showardc5afc462009-01-13 00:09:39 +0000129 init(options.logfile)
130 dispatcher = Dispatcher()
131 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
132
jadmanski0afbb632008-06-06 21:10:57 +0000133 while not _shutdown:
134 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000135 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000136 except:
showard170873e2009-01-07 00:22:26 +0000137 email_manager.manager.log_stacktrace(
138 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000139
showard170873e2009-01-07 00:22:26 +0000140 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000141 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000142 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000143 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000144
145
146def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000147 global _shutdown
148 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000149 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000150
151
152def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000153 if logfile:
154 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000155 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
156 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000157
mblighfb676032009-04-01 18:25:38 +0000158 utils.write_pid("monitor_db")
159
showardb1e51872008-10-07 11:08:18 +0000160 if _testing_mode:
161 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000162 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000163
jadmanski0afbb632008-06-06 21:10:57 +0000164 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
165 global _db
showard170873e2009-01-07 00:22:26 +0000166 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000167 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000168
showardfa8629c2008-11-04 16:51:23 +0000169 # ensure Django connection is in autocommit
170 setup_django_environment.enable_autocommit()
171
showardb18134f2009-03-20 20:52:18 +0000172 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000173 signal.signal(signal.SIGINT, handle_sigint)
174
showardd1ee1dd2009-01-07 21:33:08 +0000175 drones = global_config.global_config.get_config_value(
176 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
177 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000178 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000179 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000180 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
181
showardb18134f2009-03-20 20:52:18 +0000182 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000183
184
185def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000186 out_file = logfile
187 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000188 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000189 out_fd = open(out_file, "a", buffering=0)
190 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000191
jadmanski0afbb632008-06-06 21:10:57 +0000192 os.dup2(out_fd.fileno(), sys.stdout.fileno())
193 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000194
jadmanski0afbb632008-06-06 21:10:57 +0000195 sys.stdout = out_fd
196 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000197
198
mblighd5c95802008-03-05 00:33:46 +0000199def queue_entries_to_abort():
jadmanski0afbb632008-06-06 21:10:57 +0000200 rows = _db.execute("""
201 SELECT * FROM host_queue_entries WHERE status='Abort';
202 """)
showard2bab8f42008-11-12 18:15:22 +0000203
jadmanski0afbb632008-06-06 21:10:57 +0000204 qe = [HostQueueEntry(row=i) for i in rows]
205 return qe
mbligh36768f02008-02-22 18:28:33 +0000206
showard7cf9a9b2008-05-15 21:15:52 +0000207
showard89f84db2009-03-12 20:39:13 +0000208class SchedulerError(Exception):
209 """Raised by HostScheduler when an inconsistent state occurs."""
210
211
showard63a34772008-08-18 19:32:50 +0000212class HostScheduler(object):
213 def _get_ready_hosts(self):
214 # avoid any host with a currently active queue entry against it
215 hosts = Host.fetch(
216 joins='LEFT JOIN host_queue_entries AS active_hqe '
217 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000218 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000219 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000220 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000221 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
222 return dict((host.id, host) for host in hosts)
223
224
225 @staticmethod
226 def _get_sql_id_list(id_list):
227 return ','.join(str(item_id) for item_id in id_list)
228
229
230 @classmethod
showard989f25d2008-10-01 11:38:11 +0000231 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000232 if not id_list:
233 return {}
showard63a34772008-08-18 19:32:50 +0000234 query %= cls._get_sql_id_list(id_list)
235 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000236 return cls._process_many2many_dict(rows, flip)
237
238
239 @staticmethod
240 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000241 result = {}
242 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000243 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000244 if flip:
245 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000246 result.setdefault(left_id, set()).add(right_id)
247 return result
248
249
250 @classmethod
251 def _get_job_acl_groups(cls, job_ids):
252 query = """
showardd9ac4452009-02-07 02:04:37 +0000253 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000254 FROM jobs
255 INNER JOIN users ON users.login = jobs.owner
256 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
257 WHERE jobs.id IN (%s)
258 """
259 return cls._get_many2many_dict(query, job_ids)
260
261
262 @classmethod
263 def _get_job_ineligible_hosts(cls, job_ids):
264 query = """
265 SELECT job_id, host_id
266 FROM ineligible_host_queues
267 WHERE job_id IN (%s)
268 """
269 return cls._get_many2many_dict(query, job_ids)
270
271
272 @classmethod
showard989f25d2008-10-01 11:38:11 +0000273 def _get_job_dependencies(cls, job_ids):
274 query = """
275 SELECT job_id, label_id
276 FROM jobs_dependency_labels
277 WHERE job_id IN (%s)
278 """
279 return cls._get_many2many_dict(query, job_ids)
280
281
282 @classmethod
showard63a34772008-08-18 19:32:50 +0000283 def _get_host_acls(cls, host_ids):
284 query = """
showardd9ac4452009-02-07 02:04:37 +0000285 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000286 FROM acl_groups_hosts
287 WHERE host_id IN (%s)
288 """
289 return cls._get_many2many_dict(query, host_ids)
290
291
292 @classmethod
293 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000294 if not host_ids:
295 return {}, {}
showard63a34772008-08-18 19:32:50 +0000296 query = """
297 SELECT label_id, host_id
298 FROM hosts_labels
299 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000300 """ % cls._get_sql_id_list(host_ids)
301 rows = _db.execute(query)
302 labels_to_hosts = cls._process_many2many_dict(rows)
303 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
304 return labels_to_hosts, hosts_to_labels
305
306
307 @classmethod
308 def _get_labels(cls):
309 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000310
311
312 def refresh(self, pending_queue_entries):
313 self._hosts_available = self._get_ready_hosts()
314
315 relevant_jobs = [queue_entry.job_id
316 for queue_entry in pending_queue_entries]
317 self._job_acls = self._get_job_acl_groups(relevant_jobs)
318 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000319 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000320
321 host_ids = self._hosts_available.keys()
322 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000323 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
324
325 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000326
327
328 def _is_acl_accessible(self, host_id, queue_entry):
329 job_acls = self._job_acls.get(queue_entry.job_id, set())
330 host_acls = self._host_acls.get(host_id, set())
331 return len(host_acls.intersection(job_acls)) > 0
332
333
showard989f25d2008-10-01 11:38:11 +0000334 def _check_job_dependencies(self, job_dependencies, host_labels):
335 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000336 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000337
338
339 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
340 queue_entry):
showardade14e22009-01-26 22:38:32 +0000341 if not queue_entry.meta_host:
342 # bypass only_if_needed labels when a specific host is selected
343 return True
344
showard989f25d2008-10-01 11:38:11 +0000345 for label_id in host_labels:
346 label = self._labels[label_id]
347 if not label.only_if_needed:
348 # we don't care about non-only_if_needed labels
349 continue
350 if queue_entry.meta_host == label_id:
351 # if the label was requested in a metahost it's OK
352 continue
353 if label_id not in job_dependencies:
354 return False
355 return True
356
357
showard89f84db2009-03-12 20:39:13 +0000358 def _check_atomic_group_labels(self, host_labels, queue_entry):
359 """
360 Determine if the given HostQueueEntry's atomic group settings are okay
361 to schedule on a host with the given labels.
362
363 @param host_labels - A list of label ids that the host has.
364 @param queue_entry - The HostQueueEntry being considered for the host.
365
366 @returns True if atomic group settings are okay, False otherwise.
367 """
368 return (self._get_host_atomic_group_id(host_labels) ==
369 queue_entry.atomic_group_id)
370
371
372 def _get_host_atomic_group_id(self, host_labels):
373 """
374 Return the atomic group label id for a host with the given set of
375 labels if any, or None otherwise. Raises an exception if more than
376 one atomic group are found in the set of labels.
377
378 @param host_labels - A list of label ids that the host has.
379
380 @returns The id of the atomic group found on a label in host_labels
381 or None if no atomic group label is found.
382 @raises SchedulerError - If more than one atomic group label is found.
383 """
384 atomic_ids = [self._labels[label_id].atomic_group_id
385 for label_id in host_labels
386 if self._labels[label_id].atomic_group_id is not None]
387 if not atomic_ids:
388 return None
389 if len(atomic_ids) > 1:
390 raise SchedulerError('More than one atomic label on host.')
391 return atomic_ids[0]
392
393
394 def _get_atomic_group_labels(self, atomic_group_id):
395 """
396 Lookup the label ids that an atomic_group is associated with.
397
398 @param atomic_group_id - The id of the AtomicGroup to look up.
399
400 @returns A generator yeilding Label ids for this atomic group.
401 """
402 return (id for id, label in self._labels.iteritems()
403 if label.atomic_group_id == atomic_group_id
404 and not label.invalid)
405
406
407 def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
408 """
409 @param group_hosts - A sequence of Host ids to test for usability
410 and eligibility against the Job associated with queue_entry.
411 @param queue_entry - The HostQueueEntry that these hosts are being
412 tested for eligibility against.
413
414 @returns A subset of group_hosts Host ids that are eligible for the
415 supplied queue_entry.
416 """
417 return set(host_id for host_id in group_hosts
418 if self._is_host_usable(host_id)
419 and self._is_host_eligible_for_job(host_id, queue_entry))
420
421
showard989f25d2008-10-01 11:38:11 +0000422 def _is_host_eligible_for_job(self, host_id, queue_entry):
423 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
424 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000425
showard89f84db2009-03-12 20:39:13 +0000426 return (self._is_acl_accessible(host_id, queue_entry) and
427 self._check_job_dependencies(job_dependencies, host_labels) and
428 self._check_only_if_needed_labels(
429 job_dependencies, host_labels, queue_entry) and
430 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000431
432
showard63a34772008-08-18 19:32:50 +0000433 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000434 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000435 return None
436 return self._hosts_available.pop(queue_entry.host_id, None)
437
438
439 def _is_host_usable(self, host_id):
440 if host_id not in self._hosts_available:
441 # host was already used during this scheduling cycle
442 return False
443 if self._hosts_available[host_id].invalid:
444 # Invalid hosts cannot be used for metahosts. They're included in
445 # the original query because they can be used by non-metahosts.
446 return False
447 return True
448
449
450 def _schedule_metahost(self, queue_entry):
451 label_id = queue_entry.meta_host
452 hosts_in_label = self._label_hosts.get(label_id, set())
453 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
454 set())
455
456 # must iterate over a copy so we can mutate the original while iterating
457 for host_id in list(hosts_in_label):
458 if not self._is_host_usable(host_id):
459 hosts_in_label.remove(host_id)
460 continue
461 if host_id in ineligible_host_ids:
462 continue
showard989f25d2008-10-01 11:38:11 +0000463 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000464 continue
465
showard89f84db2009-03-12 20:39:13 +0000466 # Remove the host from our cached internal state before returning
467 # the host object.
showard63a34772008-08-18 19:32:50 +0000468 hosts_in_label.remove(host_id)
469 return self._hosts_available.pop(host_id)
470 return None
471
472
473 def find_eligible_host(self, queue_entry):
474 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000475 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000476 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000477 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000478 return self._schedule_metahost(queue_entry)
479
480
showard89f84db2009-03-12 20:39:13 +0000481 def find_eligible_atomic_group(self, queue_entry):
482 """
483 Given an atomic group host queue entry, locate an appropriate group
484 of hosts for the associated job to run on.
485
486 The caller is responsible for creating new HQEs for the additional
487 hosts returned in order to run the actual job on them.
488
489 @returns A list of Host instances in a ready state to satisfy this
490 atomic group scheduling. Hosts will all belong to the same
491 atomic group label as specified by the queue_entry.
492 An empty list will be returned if no suitable atomic
493 group could be found.
494
495 TODO(gps): what is responsible for kicking off any attempted repairs on
496 a group of hosts? not this function, but something needs to. We do
497 not communicate that reason for returning [] outside of here...
498 For now, we'll just be unschedulable if enough hosts within one group
499 enter Repair Failed state.
500 """
501 assert queue_entry.atomic_group_id is not None
502 job = queue_entry.job
503 assert job.synch_count and job.synch_count > 0
504 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
505 if job.synch_count > atomic_group.max_number_of_machines:
506 # Such a Job and HostQueueEntry should never be possible to
507 # create using the frontend. Regardless, we can't process it.
508 # Abort it immediately and log an error on the scheduler.
509 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000510 logging.error(
511 'Error: job %d synch_count=%d > requested atomic_group %d '
512 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
513 job.id, job.synch_count, atomic_group.id,
514 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000515 return []
516 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
517 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
518 set())
519
520 # Look in each label associated with atomic_group until we find one with
521 # enough hosts to satisfy the job.
522 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
523 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
524 if queue_entry.meta_host is not None:
525 # If we have a metahost label, only allow its hosts.
526 group_hosts.intersection_update(hosts_in_label)
527 group_hosts -= ineligible_host_ids
528 eligible_hosts_in_group = self._get_eligible_hosts_in_group(
529 group_hosts, queue_entry)
530
531 # Job.synch_count is treated as "minimum synch count" when
532 # scheduling for an atomic group of hosts. The atomic group
533 # number of machines is the maximum to pick out of a single
534 # atomic group label for scheduling at one time.
535 min_hosts = job.synch_count
536 max_hosts = atomic_group.max_number_of_machines
537
538 if len(eligible_hosts_in_group) < min_hosts:
539 # Not enough eligible hosts in this atomic group label.
540 continue
541
542 # Limit ourselves to scheduling the atomic group size.
543 if len(eligible_hosts_in_group) > max_hosts:
544 eligible_hosts_in_group = random.sample(
545 eligible_hosts_in_group, max_hosts)
546
547 # Remove the selected hosts from our cached internal state
548 # of available hosts in order to return the Host objects.
549 host_list = []
550 for host_id in eligible_hosts_in_group:
551 hosts_in_label.discard(host_id)
552 host_list.append(self._hosts_available.pop(host_id))
553 return host_list
554
555 return []
556
557
showard170873e2009-01-07 00:22:26 +0000558class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000559 def __init__(self):
560 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000561 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000562 self._host_scheduler = HostScheduler()
showard170873e2009-01-07 00:22:26 +0000563 self._host_agents = {}
564 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000565
mbligh36768f02008-02-22 18:28:33 +0000566
jadmanski0afbb632008-06-06 21:10:57 +0000567 def do_initial_recovery(self, recover_hosts=True):
568 # always recover processes
569 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000570
jadmanski0afbb632008-06-06 21:10:57 +0000571 if recover_hosts:
572 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000573
574
jadmanski0afbb632008-06-06 21:10:57 +0000575 def tick(self):
showard170873e2009-01-07 00:22:26 +0000576 _drone_manager.refresh()
showarda3ab0d52008-11-03 19:03:47 +0000577 self._run_cleanup_maybe()
jadmanski0afbb632008-06-06 21:10:57 +0000578 self._find_aborting()
579 self._schedule_new_jobs()
580 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000581 _drone_manager.execute_actions()
582 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000583
showard97aed502008-11-04 02:01:24 +0000584
showarda3ab0d52008-11-03 19:03:47 +0000585 def _run_cleanup_maybe(self):
showardd1ee1dd2009-01-07 21:33:08 +0000586 should_cleanup = (self._last_clean_time +
587 scheduler_config.config.clean_interval * 60 <
588 time.time())
589 if should_cleanup:
showardb18134f2009-03-20 20:52:18 +0000590 logging.info('Running cleanup')
showarda3ab0d52008-11-03 19:03:47 +0000591 self._abort_timed_out_jobs()
592 self._abort_jobs_past_synch_start_timeout()
593 self._clear_inactive_blocks()
showardfa8629c2008-11-04 16:51:23 +0000594 self._check_for_db_inconsistencies()
showarda3ab0d52008-11-03 19:03:47 +0000595 self._last_clean_time = time.time()
596
mbligh36768f02008-02-22 18:28:33 +0000597
showard170873e2009-01-07 00:22:26 +0000598 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
599 for object_id in object_ids:
600 agent_dict.setdefault(object_id, set()).add(agent)
601
602
603 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
604 for object_id in object_ids:
605 assert object_id in agent_dict
606 agent_dict[object_id].remove(agent)
607
608
jadmanski0afbb632008-06-06 21:10:57 +0000609 def add_agent(self, agent):
610 self._agents.append(agent)
611 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000612 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
613 self._register_agent_for_ids(self._queue_entry_agents,
614 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000615
showard170873e2009-01-07 00:22:26 +0000616
617 def get_agents_for_entry(self, queue_entry):
618 """
619 Find agents corresponding to the specified queue_entry.
620 """
621 return self._queue_entry_agents.get(queue_entry.id, set())
622
623
624 def host_has_agent(self, host):
625 """
626 Determine if there is currently an Agent present using this host.
627 """
628 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000629
630
jadmanski0afbb632008-06-06 21:10:57 +0000631 def remove_agent(self, agent):
632 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000633 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
634 agent)
635 self._unregister_agent_for_ids(self._queue_entry_agents,
636 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000637
638
showard4c5374f2008-09-04 17:02:56 +0000639 def num_running_processes(self):
640 return sum(agent.num_processes for agent in self._agents
641 if agent.is_running())
mblighbb421852008-03-11 22:36:16 +0000642
643
showard170873e2009-01-07 00:22:26 +0000644 def _extract_execution_tag(self, command_line):
645 match = re.match(r'.* -P (\S+) ', command_line)
646 if not match:
647 return None
648 return match.group(1)
mblighbb421852008-03-11 22:36:16 +0000649
650
showard2bab8f42008-11-12 18:15:22 +0000651 def _recover_queue_entries(self, queue_entries, run_monitor):
652 assert len(queue_entries) > 0
showard2bab8f42008-11-12 18:15:22 +0000653 queue_task = RecoveryQueueTask(job=queue_entries[0].job,
654 queue_entries=queue_entries,
655 run_monitor=run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +0000656 self.add_agent(Agent(tasks=[queue_task],
showard170873e2009-01-07 00:22:26 +0000657 num_processes=len(queue_entries)))
mblighbb421852008-03-11 22:36:16 +0000658
659
jadmanski0afbb632008-06-06 21:10:57 +0000660 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000661 self._register_pidfiles()
662 _drone_manager.refresh()
663 self._recover_running_entries()
664 self._recover_aborting_entries()
665 self._requeue_other_active_entries()
666 self._recover_parsing_entries()
667 self._reverify_remaining_hosts()
668 # reinitialize drones after killing orphaned processes, since they can
669 # leave around files when they die
670 _drone_manager.execute_actions()
671 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000672
showard170873e2009-01-07 00:22:26 +0000673
674 def _register_pidfiles(self):
675 # during recovery we may need to read pidfiles for both running and
676 # parsing entries
677 queue_entries = HostQueueEntry.fetch(
678 where="status IN ('Running', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000679 for queue_entry in queue_entries:
showard170873e2009-01-07 00:22:26 +0000680 pidfile_id = _drone_manager.get_pidfile_id_from(
681 queue_entry.execution_tag())
682 _drone_manager.register_pidfile(pidfile_id)
683
684
685 def _recover_running_entries(self):
686 orphans = _drone_manager.get_orphaned_autoserv_processes()
687
688 queue_entries = HostQueueEntry.fetch(where="status = 'Running'")
689 requeue_entries = []
690 for queue_entry in queue_entries:
691 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000692 # synchronous job we've already recovered
693 continue
showard170873e2009-01-07 00:22:26 +0000694 execution_tag = queue_entry.execution_tag()
695 run_monitor = PidfileRunMonitor()
696 run_monitor.attach_to_existing_process(execution_tag)
697 if not run_monitor.has_process():
698 # autoserv apparently never got run, so let it get requeued
699 continue
showarde788ea62008-11-17 21:02:47 +0000700 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showardb18134f2009-03-20 20:52:18 +0000701 logging.info('Recovering %s (process %s)',
702 (', '.join(str(entry) for entry in queue_entries),
703 run_monitor.get_process()))
showard2bab8f42008-11-12 18:15:22 +0000704 self._recover_queue_entries(queue_entries, run_monitor)
showard170873e2009-01-07 00:22:26 +0000705 orphans.pop(execution_tag, None)
mbligh90a549d2008-03-25 23:52:34 +0000706
jadmanski0afbb632008-06-06 21:10:57 +0000707 # now kill any remaining autoserv processes
showard170873e2009-01-07 00:22:26 +0000708 for process in orphans.itervalues():
showardb18134f2009-03-20 20:52:18 +0000709 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000710 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000711
showard170873e2009-01-07 00:22:26 +0000712
713 def _recover_aborting_entries(self):
714 queue_entries = HostQueueEntry.fetch(
715 where='status IN ("Abort", "Aborting")')
jadmanski0afbb632008-06-06 21:10:57 +0000716 for queue_entry in queue_entries:
showardb18134f2009-03-20 20:52:18 +0000717 logging.info('Recovering aborting QE %s', queue_entry)
showard170873e2009-01-07 00:22:26 +0000718 agent = queue_entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000719
showard97aed502008-11-04 02:01:24 +0000720
showard170873e2009-01-07 00:22:26 +0000721 def _requeue_other_active_entries(self):
722 queue_entries = HostQueueEntry.fetch(
723 where='active AND NOT complete AND status != "Pending"')
724 for queue_entry in queue_entries:
725 if self.get_agents_for_entry(queue_entry):
726 # entry has already been recovered
727 continue
showardb18134f2009-03-20 20:52:18 +0000728 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
729 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000730 if queue_entry.host:
731 tasks = queue_entry.host.reverify_tasks()
732 self.add_agent(Agent(tasks))
733 agent = queue_entry.requeue()
734
735
736 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000737 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000738 self._reverify_hosts_where("""(status = 'Repairing' OR
739 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000740 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000741
showard170873e2009-01-07 00:22:26 +0000742 # recover "Running" hosts with no active queue entries, although this
743 # should never happen
744 message = ('Recovering running host %s - this probably indicates a '
745 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000746 self._reverify_hosts_where("""status = 'Running' AND
747 id NOT IN (SELECT host_id
748 FROM host_queue_entries
749 WHERE active)""",
750 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000751
752
jadmanski0afbb632008-06-06 21:10:57 +0000753 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000754 print_message='Reverifying host %s'):
755 full_where='locked = 0 AND invalid = 0 AND ' + where
756 for host in Host.fetch(where=full_where):
757 if self.host_has_agent(host):
758 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000759 continue
showard170873e2009-01-07 00:22:26 +0000760 if print_message:
showardb18134f2009-03-20 20:52:18 +0000761 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000762 tasks = host.reverify_tasks()
763 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000764
765
showard97aed502008-11-04 02:01:24 +0000766 def _recover_parsing_entries(self):
showard2bab8f42008-11-12 18:15:22 +0000767 recovered_entry_ids = set()
showard97aed502008-11-04 02:01:24 +0000768 for entry in HostQueueEntry.fetch(where='status = "Parsing"'):
showard2bab8f42008-11-12 18:15:22 +0000769 if entry.id in recovered_entry_ids:
770 continue
771 queue_entries = entry.job.get_group_entries(entry)
showard170873e2009-01-07 00:22:26 +0000772 recovered_entry_ids = recovered_entry_ids.union(
773 entry.id for entry in queue_entries)
showardb18134f2009-03-20 20:52:18 +0000774 logging.info('Recovering parsing entries %s',
775 (', '.join(str(entry) for entry in queue_entries)))
showard97aed502008-11-04 02:01:24 +0000776
777 reparse_task = FinalReparseTask(queue_entries)
showard170873e2009-01-07 00:22:26 +0000778 self.add_agent(Agent([reparse_task], num_processes=0))
showard97aed502008-11-04 02:01:24 +0000779
780
jadmanski0afbb632008-06-06 21:10:57 +0000781 def _recover_hosts(self):
782 # recover "Repair Failed" hosts
783 message = 'Reverifying dead host %s'
784 self._reverify_hosts_where("status = 'Repair Failed'",
785 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000786
787
showard3bb499f2008-07-03 19:42:20 +0000788 def _abort_timed_out_jobs(self):
789 """
790 Aborts all jobs that have timed out and not completed
791 """
showarda3ab0d52008-11-03 19:03:47 +0000792 query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
793 where=['created_on + INTERVAL timeout HOUR < NOW()'])
794 for job in query.distinct():
showardb18134f2009-03-20 20:52:18 +0000795 logging.warning('Aborting job %d due to job timeout', job.id)
showarda3ab0d52008-11-03 19:03:47 +0000796 job.abort(None)
showard3bb499f2008-07-03 19:42:20 +0000797
798
showard98863972008-10-29 21:14:56 +0000799 def _abort_jobs_past_synch_start_timeout(self):
800 """
801 Abort synchronous jobs that are past the start timeout (from global
802 config) and are holding a machine that's in everyone.
803 """
804 timeout_delta = datetime.timedelta(
showardd1ee1dd2009-01-07 21:33:08 +0000805 minutes=scheduler_config.config.synch_job_start_timeout_minutes)
showard98863972008-10-29 21:14:56 +0000806 timeout_start = datetime.datetime.now() - timeout_delta
807 query = models.Job.objects.filter(
showard98863972008-10-29 21:14:56 +0000808 created_on__lt=timeout_start,
809 hostqueueentry__status='Pending',
showardd9ac4452009-02-07 02:04:37 +0000810 hostqueueentry__host__aclgroup__name='Everyone')
showard98863972008-10-29 21:14:56 +0000811 for job in query.distinct():
showardb18134f2009-03-20 20:52:18 +0000812 logging.warning('Aborting job %d due to start timeout', job.id)
showardff059d72008-12-03 18:18:53 +0000813 entries_to_abort = job.hostqueueentry_set.exclude(
814 status=models.HostQueueEntry.Status.RUNNING)
815 for queue_entry in entries_to_abort:
816 queue_entry.abort(None)
showard98863972008-10-29 21:14:56 +0000817
818
jadmanski0afbb632008-06-06 21:10:57 +0000819 def _clear_inactive_blocks(self):
820 """
821 Clear out blocks for all completed jobs.
822 """
823 # this would be simpler using NOT IN (subquery), but MySQL
824 # treats all IN subqueries as dependent, so this optimizes much
825 # better
826 _db.execute("""
827 DELETE ihq FROM ineligible_host_queues ihq
showard4eaaf522008-06-06 22:28:07 +0000828 LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries
jadmanski0afbb632008-06-06 21:10:57 +0000829 WHERE NOT complete) hqe
830 USING (job_id) WHERE hqe.job_id IS NULL""")
showard04c82c52008-05-29 19:38:12 +0000831
832
showardb95b1bd2008-08-15 18:11:04 +0000833 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000834 # prioritize by job priority, then non-metahost over metahost, then FIFO
835 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000836 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000837 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000838 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000839
840
showard89f84db2009-03-12 20:39:13 +0000841 def _refresh_pending_queue_entries(self):
842 """
843 Lookup the pending HostQueueEntries and call our HostScheduler
844 refresh() method given that list. Return the list.
845
846 @returns A list of pending HostQueueEntries sorted in priority order.
847 """
showard63a34772008-08-18 19:32:50 +0000848 queue_entries = self._get_pending_queue_entries()
849 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000850 return []
showardb95b1bd2008-08-15 18:11:04 +0000851
showard63a34772008-08-18 19:32:50 +0000852 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000853
showard89f84db2009-03-12 20:39:13 +0000854 return queue_entries
855
856
857 def _schedule_atomic_group(self, queue_entry):
858 """
859 Schedule the given queue_entry on an atomic group of hosts.
860
861 Returns immediately if there are insufficient available hosts.
862
863 Creates new HostQueueEntries based off of queue_entry for the
864 scheduled hosts and starts them all running.
865 """
866 # This is a virtual host queue entry representing an entire
867 # atomic group, find a group and schedule their hosts.
868 group_hosts = self._host_scheduler.find_eligible_atomic_group(
869 queue_entry)
870 if not group_hosts:
871 return
872 # The first assigned host uses the original HostQueueEntry
873 group_queue_entries = [queue_entry]
874 for assigned_host in group_hosts[1:]:
875 # Create a new HQE for every additional assigned_host.
876 new_hqe = HostQueueEntry.clone(queue_entry)
877 new_hqe.save()
878 group_queue_entries.append(new_hqe)
879 assert len(group_queue_entries) == len(group_hosts)
880 for queue_entry, host in itertools.izip(group_queue_entries,
881 group_hosts):
882 self._run_queue_entry(queue_entry, host)
883
884
885 def _schedule_new_jobs(self):
886 queue_entries = self._refresh_pending_queue_entries()
887 if not queue_entries:
888 return
889
showard63a34772008-08-18 19:32:50 +0000890 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000891 if (queue_entry.atomic_group_id is None or
892 queue_entry.host_id is not None):
893 assigned_host = self._host_scheduler.find_eligible_host(
894 queue_entry)
895 if assigned_host:
896 self._run_queue_entry(queue_entry, assigned_host)
897 else:
898 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000899
900
901 def _run_queue_entry(self, queue_entry, host):
902 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000903 # in some cases (synchronous jobs with run_verify=False), agent may be
904 # None
showard9976ce92008-10-15 20:28:13 +0000905 if agent:
906 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000907
908
jadmanski0afbb632008-06-06 21:10:57 +0000909 def _find_aborting(self):
jadmanski0afbb632008-06-06 21:10:57 +0000910 for entry in queue_entries_to_abort():
showard170873e2009-01-07 00:22:26 +0000911 agents_to_abort = list(self.get_agents_for_entry(entry))
showard1be97432008-10-17 15:30:45 +0000912 for agent in agents_to_abort:
913 self.remove_agent(agent)
914
showard170873e2009-01-07 00:22:26 +0000915 entry.abort(self, agents_to_abort)
jadmanski0afbb632008-06-06 21:10:57 +0000916
917
showard324bf812009-01-20 23:23:38 +0000918 def _can_start_agent(self, agent, num_started_this_cycle,
919 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000920 # always allow zero-process agents to run
921 if agent.num_processes == 0:
922 return True
923 # don't allow any nonzero-process agents to run after we've reached a
924 # limit (this avoids starvation of many-process agents)
925 if have_reached_limit:
926 return False
927 # total process throttling
showard324bf812009-01-20 23:23:38 +0000928 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000929 return False
930 # if a single agent exceeds the per-cycle throttling, still allow it to
931 # run when it's the first agent in the cycle
932 if num_started_this_cycle == 0:
933 return True
934 # per-cycle throttling
935 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000936 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000937 return False
938 return True
939
940
jadmanski0afbb632008-06-06 21:10:57 +0000941 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000942 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000943 have_reached_limit = False
944 # iterate over copy, so we can remove agents during iteration
945 for agent in list(self._agents):
946 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000947 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000948 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000949 continue
950 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000951 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000952 have_reached_limit):
953 have_reached_limit = True
954 continue
showard4c5374f2008-09-04 17:02:56 +0000955 num_started_this_cycle += agent.num_processes
956 agent.tick()
showardb18134f2009-03-20 20:52:18 +0000957 logging.info('%d running processes',
958 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000959
960
showardfa8629c2008-11-04 16:51:23 +0000961 def _check_for_db_inconsistencies(self):
962 query = models.HostQueueEntry.objects.filter(active=True, complete=True)
963 if query.count() != 0:
964 subject = ('%d queue entries found with active=complete=1'
965 % query.count())
966 message = '\n'.join(str(entry.get_object_dict())
967 for entry in query[:50])
968 if len(query) > 50:
969 message += '\n(truncated)\n'
970
showardb18134f2009-03-20 20:52:18 +0000971 logging.error(subject)
showard170873e2009-01-07 00:22:26 +0000972 email_manager.manager.enqueue_notify_email(subject, message)
showardfa8629c2008-11-04 16:51:23 +0000973
974
showard170873e2009-01-07 00:22:26 +0000975class PidfileRunMonitor(object):
976 """
977 Client must call either run() to start a new process or
978 attach_to_existing_process().
979 """
mbligh36768f02008-02-22 18:28:33 +0000980
showard170873e2009-01-07 00:22:26 +0000981 class _PidfileException(Exception):
982 """
983 Raised when there's some unexpected behavior with the pid file, but only
984 used internally (never allowed to escape this class).
985 """
mbligh36768f02008-02-22 18:28:33 +0000986
987
showard170873e2009-01-07 00:22:26 +0000988 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000989 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000990 self._start_time = None
991 self.pidfile_id = None
992 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000993
994
showard170873e2009-01-07 00:22:26 +0000995 def _add_nice_command(self, command, nice_level):
996 if not nice_level:
997 return command
998 return ['nice', '-n', str(nice_level)] + command
999
1000
1001 def _set_start_time(self):
1002 self._start_time = time.time()
1003
1004
1005 def run(self, command, working_directory, nice_level=None, log_file=None,
1006 pidfile_name=None, paired_with_pidfile=None):
1007 assert command is not None
1008 if nice_level is not None:
1009 command = ['nice', '-n', str(nice_level)] + command
1010 self._set_start_time()
1011 self.pidfile_id = _drone_manager.execute_command(
1012 command, working_directory, log_file=log_file,
1013 pidfile_name=pidfile_name, paired_with_pidfile=paired_with_pidfile)
1014
1015
1016 def attach_to_existing_process(self, execution_tag):
1017 self._set_start_time()
1018 self.pidfile_id = _drone_manager.get_pidfile_id_from(execution_tag)
1019 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001020
1021
jadmanski0afbb632008-06-06 21:10:57 +00001022 def kill(self):
showard170873e2009-01-07 00:22:26 +00001023 if self.has_process():
1024 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001025
mbligh36768f02008-02-22 18:28:33 +00001026
showard170873e2009-01-07 00:22:26 +00001027 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001028 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001029 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001030
1031
showard170873e2009-01-07 00:22:26 +00001032 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001033 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001034 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001035 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001036
1037
showard170873e2009-01-07 00:22:26 +00001038 def _read_pidfile(self, use_second_read=False):
1039 assert self.pidfile_id is not None, (
1040 'You must call run() or attach_to_existing_process()')
1041 contents = _drone_manager.get_pidfile_contents(
1042 self.pidfile_id, use_second_read=use_second_read)
1043 if contents.is_invalid():
1044 self._state = drone_manager.PidfileContents()
1045 raise self._PidfileException(contents)
1046 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001047
1048
showard21baa452008-10-21 00:08:39 +00001049 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001050 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1051 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001052 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001053 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001054 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001055
1056
1057 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001058 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001059 return
mblighbb421852008-03-11 22:36:16 +00001060
showard21baa452008-10-21 00:08:39 +00001061 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001062
showard170873e2009-01-07 00:22:26 +00001063 if self._state.process is None:
1064 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001065 return
mbligh90a549d2008-03-25 23:52:34 +00001066
showard21baa452008-10-21 00:08:39 +00001067 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001068 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001069 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001070 return
mbligh90a549d2008-03-25 23:52:34 +00001071
showard170873e2009-01-07 00:22:26 +00001072 # pid but no running process - maybe process *just* exited
1073 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001074 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001075 # autoserv exited without writing an exit code
1076 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001077 self._handle_pidfile_error(
1078 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001079
showard21baa452008-10-21 00:08:39 +00001080
1081 def _get_pidfile_info(self):
1082 """\
1083 After completion, self._state will contain:
1084 pid=None, exit_status=None if autoserv has not yet run
1085 pid!=None, exit_status=None if autoserv is running
1086 pid!=None, exit_status!=None if autoserv has completed
1087 """
1088 try:
1089 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001090 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001091 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001092
1093
showard170873e2009-01-07 00:22:26 +00001094 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001095 """\
1096 Called when no pidfile is found or no pid is in the pidfile.
1097 """
showard170873e2009-01-07 00:22:26 +00001098 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001099 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001100 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1101 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001102 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001103 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001104
1105
showard35162b02009-03-03 02:17:30 +00001106 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001107 """\
1108 Called when autoserv has exited without writing an exit status,
1109 or we've timed out waiting for autoserv to write a pid to the
1110 pidfile. In either case, we just return failure and the caller
1111 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001112
showard170873e2009-01-07 00:22:26 +00001113 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001114 """
1115 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001116 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001117 self._state.exit_status = 1
1118 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001119
1120
jadmanski0afbb632008-06-06 21:10:57 +00001121 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001122 self._get_pidfile_info()
1123 return self._state.exit_status
1124
1125
1126 def num_tests_failed(self):
1127 self._get_pidfile_info()
1128 assert self._state.num_tests_failed is not None
1129 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001130
1131
mbligh36768f02008-02-22 18:28:33 +00001132class Agent(object):
showard170873e2009-01-07 00:22:26 +00001133 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001134 self.active_task = None
1135 self.queue = Queue.Queue(0)
1136 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001137 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001138
showard170873e2009-01-07 00:22:26 +00001139 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1140 for task in tasks)
1141 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1142
jadmanski0afbb632008-06-06 21:10:57 +00001143 for task in tasks:
1144 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001145
1146
showard170873e2009-01-07 00:22:26 +00001147 def _union_ids(self, id_lists):
1148 return set(itertools.chain(*id_lists))
1149
1150
jadmanski0afbb632008-06-06 21:10:57 +00001151 def add_task(self, task):
1152 self.queue.put_nowait(task)
1153 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001154
1155
jadmanski0afbb632008-06-06 21:10:57 +00001156 def tick(self):
showard21baa452008-10-21 00:08:39 +00001157 while not self.is_done():
1158 if self.active_task and not self.active_task.is_done():
1159 self.active_task.poll()
1160 if not self.active_task.is_done():
1161 return
1162 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001163
1164
jadmanski0afbb632008-06-06 21:10:57 +00001165 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001166 logging.info("agent picking task")
jadmanski0afbb632008-06-06 21:10:57 +00001167 if self.active_task:
1168 assert self.active_task.is_done()
mbligh36768f02008-02-22 18:28:33 +00001169
jadmanski0afbb632008-06-06 21:10:57 +00001170 if not self.active_task.success:
1171 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001172
jadmanski0afbb632008-06-06 21:10:57 +00001173 self.active_task = None
1174 if not self.is_done():
1175 self.active_task = self.queue.get_nowait()
1176 if self.active_task:
1177 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001178
1179
jadmanski0afbb632008-06-06 21:10:57 +00001180 def on_task_failure(self):
1181 self.queue = Queue.Queue(0)
showardccbd6c52009-03-21 00:10:21 +00001182 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1183 # get reset.
1184 new_agent = Agent(self.active_task.failure_tasks)
1185 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001186
mblighe2586682008-02-29 22:45:46 +00001187
showard4c5374f2008-09-04 17:02:56 +00001188 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001189 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001190
1191
jadmanski0afbb632008-06-06 21:10:57 +00001192 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001193 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001194
1195
jadmanski0afbb632008-06-06 21:10:57 +00001196 def start(self):
1197 assert self.dispatcher
mbligh36768f02008-02-22 18:28:33 +00001198
jadmanski0afbb632008-06-06 21:10:57 +00001199 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001200
jadmanski0afbb632008-06-06 21:10:57 +00001201
mbligh36768f02008-02-22 18:28:33 +00001202class AgentTask(object):
showard170873e2009-01-07 00:22:26 +00001203 def __init__(self, cmd, working_directory=None, failure_tasks=[]):
jadmanski0afbb632008-06-06 21:10:57 +00001204 self.done = False
1205 self.failure_tasks = failure_tasks
1206 self.started = False
1207 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001208 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001209 self.task = None
1210 self.agent = None
1211 self.monitor = None
1212 self.success = None
showard170873e2009-01-07 00:22:26 +00001213 self.queue_entry_ids = []
1214 self.host_ids = []
1215 self.log_file = None
1216
1217
1218 def _set_ids(self, host=None, queue_entries=None):
1219 if queue_entries and queue_entries != [None]:
1220 self.host_ids = [entry.host.id for entry in queue_entries]
1221 self.queue_entry_ids = [entry.id for entry in queue_entries]
1222 else:
1223 assert host
1224 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001225
1226
jadmanski0afbb632008-06-06 21:10:57 +00001227 def poll(self):
jadmanski0afbb632008-06-06 21:10:57 +00001228 if self.monitor:
1229 self.tick(self.monitor.exit_code())
1230 else:
1231 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001232
1233
jadmanski0afbb632008-06-06 21:10:57 +00001234 def tick(self, exit_code):
showard170873e2009-01-07 00:22:26 +00001235 if exit_code is None:
jadmanski0afbb632008-06-06 21:10:57 +00001236 return
jadmanski0afbb632008-06-06 21:10:57 +00001237 if exit_code == 0:
1238 success = True
1239 else:
1240 success = False
mbligh36768f02008-02-22 18:28:33 +00001241
jadmanski0afbb632008-06-06 21:10:57 +00001242 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001243
1244
jadmanski0afbb632008-06-06 21:10:57 +00001245 def is_done(self):
1246 return self.done
mbligh36768f02008-02-22 18:28:33 +00001247
1248
jadmanski0afbb632008-06-06 21:10:57 +00001249 def finished(self, success):
1250 self.done = True
1251 self.success = success
1252 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001253
1254
jadmanski0afbb632008-06-06 21:10:57 +00001255 def prolog(self):
1256 pass
mblighd64e5702008-04-04 21:39:28 +00001257
1258
jadmanski0afbb632008-06-06 21:10:57 +00001259 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001260 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001261
mbligh36768f02008-02-22 18:28:33 +00001262
jadmanski0afbb632008-06-06 21:10:57 +00001263 def cleanup(self):
showard170873e2009-01-07 00:22:26 +00001264 if self.monitor and self.log_file:
1265 _drone_manager.copy_to_results_repository(
1266 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001267
1268
jadmanski0afbb632008-06-06 21:10:57 +00001269 def epilog(self):
1270 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001271
1272
jadmanski0afbb632008-06-06 21:10:57 +00001273 def start(self):
1274 assert self.agent
1275
1276 if not self.started:
1277 self.prolog()
1278 self.run()
1279
1280 self.started = True
1281
1282
1283 def abort(self):
1284 if self.monitor:
1285 self.monitor.kill()
1286 self.done = True
1287 self.cleanup()
1288
1289
showard170873e2009-01-07 00:22:26 +00001290 def set_host_log_file(self, base_name, host):
1291 filename = '%s.%s' % (time.time(), base_name)
1292 self.log_file = os.path.join('hosts', host.hostname, filename)
1293
1294
showardde634ee2009-01-30 01:44:24 +00001295 def _get_consistent_execution_tag(self, queue_entries):
1296 first_execution_tag = queue_entries[0].execution_tag()
1297 for queue_entry in queue_entries[1:]:
1298 assert queue_entry.execution_tag() == first_execution_tag, (
1299 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1300 queue_entry,
1301 first_execution_tag,
1302 queue_entries[0]))
1303 return first_execution_tag
1304
1305
showard678df4f2009-02-04 21:36:39 +00001306 def _copy_and_parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001307 assert len(queue_entries) > 0
1308 assert self.monitor
1309 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001310 results_path = execution_tag + '/'
1311 _drone_manager.copy_to_results_repository(self.monitor.get_process(),
1312 results_path)
showardde634ee2009-01-30 01:44:24 +00001313
1314 reparse_task = FinalReparseTask(queue_entries)
1315 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1316
1317
jadmanski0afbb632008-06-06 21:10:57 +00001318 def run(self):
1319 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001320 self.monitor = PidfileRunMonitor()
1321 self.monitor.run(self.cmd, self._working_directory,
1322 nice_level=AUTOSERV_NICE_LEVEL,
1323 log_file=self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001324
1325
1326class RepairTask(AgentTask):
showarde788ea62008-11-17 21:02:47 +00001327 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001328 """\
showard170873e2009-01-07 00:22:26 +00001329 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001330 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001331 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001332 # normalize the protection name
1333 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001334
jadmanski0afbb632008-06-06 21:10:57 +00001335 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001336 self.queue_entry_to_fail = queue_entry
1337 # *don't* include the queue entry in IDs -- if the queue entry is
1338 # aborted, we want to leave the repair task running
1339 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001340
1341 self.create_temp_resultsdir('.repair')
1342 cmd = [_autoserv_path , '-p', '-R', '-m', host.hostname,
1343 '-r', _drone_manager.absolute_path(self.temp_results_dir),
1344 '--host-protection', protection]
1345 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1346
showard170873e2009-01-07 00:22:26 +00001347 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001348
mbligh36768f02008-02-22 18:28:33 +00001349
jadmanski0afbb632008-06-06 21:10:57 +00001350 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001351 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001352 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001353 if self.queue_entry_to_fail:
1354 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001355
1356
showardde634ee2009-01-30 01:44:24 +00001357 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001358 assert self.queue_entry_to_fail
1359
1360 if self.queue_entry_to_fail.meta_host:
1361 return # don't fail metahost entries, they'll be reassigned
1362
1363 self.queue_entry_to_fail.update_from_database()
1364 if self.queue_entry_to_fail.status != 'Queued':
1365 return # entry has been aborted
1366
1367 self.queue_entry_to_fail.set_execution_subdir()
showard678df4f2009-02-04 21:36:39 +00001368 # copy results logs into the normal place for job results
1369 _drone_manager.copy_results_on_drone(
1370 self.monitor.get_process(),
1371 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001372 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001373
showardccbd6c52009-03-21 00:10:21 +00001374 self._copy_and_parse_results([self.queue_entry_to_fail])
1375 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001376
1377
jadmanski0afbb632008-06-06 21:10:57 +00001378 def epilog(self):
1379 super(RepairTask, self).epilog()
1380 if self.success:
1381 self.host.set_status('Ready')
1382 else:
1383 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001384 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001385 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001386
1387
showard8fe93b52008-11-18 17:53:22 +00001388class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001389 def epilog(self):
1390 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001391 should_copy_results = (self.queue_entry and not self.success
1392 and not self.queue_entry.meta_host)
1393 if should_copy_results:
1394 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001395 destination = os.path.join(self.queue_entry.execution_tag(),
1396 os.path.basename(self.log_file))
1397 _drone_manager.copy_to_results_repository(
1398 self.monitor.get_process(), self.log_file,
1399 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001400
1401
1402class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001403 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001404 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001405 self.host = host or queue_entry.host
1406 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001407
jadmanski0afbb632008-06-06 21:10:57 +00001408 self.create_temp_resultsdir('.verify')
showard170873e2009-01-07 00:22:26 +00001409 cmd = [_autoserv_path, '-p', '-v', '-m', self.host.hostname, '-r',
1410 _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001411 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001412 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1413 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001414
showard170873e2009-01-07 00:22:26 +00001415 self.set_host_log_file('verify', self.host)
1416 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001417
1418
jadmanski0afbb632008-06-06 21:10:57 +00001419 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001420 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001421 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001422 if self.queue_entry:
1423 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001424 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001425
1426
jadmanski0afbb632008-06-06 21:10:57 +00001427 def epilog(self):
1428 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001429
jadmanski0afbb632008-06-06 21:10:57 +00001430 if self.success:
1431 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001432
1433
mbligh36768f02008-02-22 18:28:33 +00001434class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001435 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001436 self.job = job
1437 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001438 super(QueueTask, self).__init__(cmd, self._execution_tag())
1439 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001440
1441
showard170873e2009-01-07 00:22:26 +00001442 def _format_keyval(self, key, value):
1443 return '%s=%s' % (key, value)
mbligh36768f02008-02-22 18:28:33 +00001444
1445
showard73ec0442009-02-07 02:05:20 +00001446 def _keyval_path(self):
1447 return os.path.join(self._execution_tag(), 'keyval')
1448
1449
1450 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1451 keyval_contents = '\n'.join(self._format_keyval(key, value)
1452 for key, value in keyval_dict.iteritems())
1453 # always end with a newline to allow additional keyvals to be written
1454 keyval_contents += '\n'
1455 _drone_manager.attach_file_to_execution(self._execution_tag(),
1456 keyval_contents,
1457 file_path=keyval_path)
1458
1459
1460 def _write_keyvals_before_job(self, keyval_dict):
1461 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1462
1463
1464 def _write_keyval_after_job(self, field, value):
showard170873e2009-01-07 00:22:26 +00001465 assert self.monitor and self.monitor.has_process()
showard170873e2009-01-07 00:22:26 +00001466 _drone_manager.write_lines_to_file(
showard73ec0442009-02-07 02:05:20 +00001467 self._keyval_path(), [self._format_keyval(field, value)],
showard35162b02009-03-03 02:17:30 +00001468 paired_with_process=self.monitor.get_process())
showardd8e548a2008-09-09 03:04:57 +00001469
1470
showard170873e2009-01-07 00:22:26 +00001471 def _write_host_keyvals(self, host):
1472 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1473 host.hostname)
1474 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001475 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1476 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001477
1478
showard170873e2009-01-07 00:22:26 +00001479 def _execution_tag(self):
1480 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001481
1482
jadmanski0afbb632008-06-06 21:10:57 +00001483 def prolog(self):
showard73ec0442009-02-07 02:05:20 +00001484 queued = int(time.mktime(self.job.created_on.timetuple()))
1485 self._write_keyvals_before_job({'job_queued': queued})
jadmanski0afbb632008-06-06 21:10:57 +00001486 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001487 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001488 queue_entry.set_status('Running')
1489 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001490 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001491 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001492 assert len(self.queue_entries) == 1
1493 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001494
1495
showard35162b02009-03-03 02:17:30 +00001496 def _write_lost_process_error_file(self):
1497 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1498 _drone_manager.write_lines_to_file(error_file_path,
1499 [_LOST_PROCESS_ERROR])
1500
1501
showard97aed502008-11-04 02:01:24 +00001502 def _finish_task(self, success):
showard35162b02009-03-03 02:17:30 +00001503 if self.monitor.has_process():
1504 self._write_keyval_after_job("job_finished", int(time.time()))
1505 self._copy_and_parse_results(self.queue_entries)
1506
1507 if self.monitor.lost_process:
1508 self._write_lost_process_error_file()
1509 for queue_entry in self.queue_entries:
1510 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001511
1512
showardcbd74612008-11-19 21:42:02 +00001513 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001514 _drone_manager.write_lines_to_file(
1515 os.path.join(self._execution_tag(), 'status.log'),
1516 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001517 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001518
1519
jadmanskif7fa2cc2008-10-01 14:13:23 +00001520 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001521 if not self.monitor or not self.monitor.has_process():
1522 return
1523
jadmanskif7fa2cc2008-10-01 14:13:23 +00001524 # build up sets of all the aborted_by and aborted_on values
1525 aborted_by, aborted_on = set(), set()
1526 for queue_entry in self.queue_entries:
1527 if queue_entry.aborted_by:
1528 aborted_by.add(queue_entry.aborted_by)
1529 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1530 aborted_on.add(t)
1531
1532 # extract some actual, unique aborted by value and write it out
1533 assert len(aborted_by) <= 1
1534 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001535 aborted_by_value = aborted_by.pop()
1536 aborted_on_value = max(aborted_on)
1537 else:
1538 aborted_by_value = 'autotest_system'
1539 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001540
showarda0382352009-02-11 23:36:43 +00001541 self._write_keyval_after_job("aborted_by", aborted_by_value)
1542 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001543
showardcbd74612008-11-19 21:42:02 +00001544 aborted_on_string = str(datetime.datetime.fromtimestamp(
1545 aborted_on_value))
1546 self._write_status_comment('Job aborted by %s on %s' %
1547 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001548
1549
jadmanski0afbb632008-06-06 21:10:57 +00001550 def abort(self):
1551 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001552 self._log_abort()
showard97aed502008-11-04 02:01:24 +00001553 self._finish_task(False)
jadmanskic2ac77f2008-05-16 21:44:04 +00001554
1555
showard21baa452008-10-21 00:08:39 +00001556 def _reboot_hosts(self):
1557 reboot_after = self.job.reboot_after
1558 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00001559 if reboot_after == models.RebootAfter.ALWAYS:
showard21baa452008-10-21 00:08:39 +00001560 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00001561 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
showard21baa452008-10-21 00:08:39 +00001562 num_tests_failed = self.monitor.num_tests_failed()
1563 do_reboot = (self.success and num_tests_failed == 0)
1564
showard8ebca792008-11-04 21:54:22 +00001565 for queue_entry in self.queue_entries:
1566 if do_reboot:
showard45ae8192008-11-05 19:32:53 +00001567 # don't pass the queue entry to the CleanupTask. if the cleanup
showardfa8629c2008-11-04 16:51:23 +00001568 # fails, the job doesn't care -- it's over.
showard45ae8192008-11-05 19:32:53 +00001569 cleanup_task = CleanupTask(host=queue_entry.get_host())
1570 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
showard8ebca792008-11-04 21:54:22 +00001571 else:
1572 queue_entry.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001573
1574
jadmanski0afbb632008-06-06 21:10:57 +00001575 def epilog(self):
1576 super(QueueTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001577 self._finish_task(self.success)
showard21baa452008-10-21 00:08:39 +00001578 self._reboot_hosts()
mblighbb421852008-03-11 22:36:16 +00001579
showardb18134f2009-03-20 20:52:18 +00001580 logging.info("queue_task finished with succes=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001581
1582
mblighbb421852008-03-11 22:36:16 +00001583class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001584 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001585 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001586 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001587
1588
jadmanski0afbb632008-06-06 21:10:57 +00001589 def run(self):
1590 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001591
1592
jadmanski0afbb632008-06-06 21:10:57 +00001593 def prolog(self):
1594 # recovering an existing process - don't do prolog
1595 pass
mblighbb421852008-03-11 22:36:16 +00001596
1597
showard8fe93b52008-11-18 17:53:22 +00001598class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001599 def __init__(self, host=None, queue_entry=None):
1600 assert bool(host) ^ bool(queue_entry)
1601 if queue_entry:
1602 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001603 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001604 self.host = host
showard170873e2009-01-07 00:22:26 +00001605
1606 self.create_temp_resultsdir('.cleanup')
1607 self.cmd = [_autoserv_path, '-p', '--cleanup', '-m', host.hostname,
1608 '-r', _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001609 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001610 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1611 failure_tasks=[repair_task])
1612
1613 self._set_ids(host=host, queue_entries=[queue_entry])
1614 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001615
mblighd5c95802008-03-05 00:33:46 +00001616
jadmanski0afbb632008-06-06 21:10:57 +00001617 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001618 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001619 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001620 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001621
mblighd5c95802008-03-05 00:33:46 +00001622
showard21baa452008-10-21 00:08:39 +00001623 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001624 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001625 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001626 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001627 self.host.update_field('dirty', 0)
1628
1629
mblighd5c95802008-03-05 00:33:46 +00001630class AbortTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001631 def __init__(self, queue_entry, agents_to_abort):
jadmanski0afbb632008-06-06 21:10:57 +00001632 super(AbortTask, self).__init__('')
showard170873e2009-01-07 00:22:26 +00001633 self.queue_entry = queue_entry
1634 # don't use _set_ids, since we don't want to set the host_ids
1635 self.queue_entry_ids = [queue_entry.id]
1636 self.agents_to_abort = agents_to_abort
mbligh36768f02008-02-22 18:28:33 +00001637
1638
jadmanski0afbb632008-06-06 21:10:57 +00001639 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001640 logging.info("starting abort on host %s, job %s",
1641 self.queue_entry.host_id, self.queue_entry.job_id)
mbligh36768f02008-02-22 18:28:33 +00001642
mblighd64e5702008-04-04 21:39:28 +00001643
jadmanski0afbb632008-06-06 21:10:57 +00001644 def epilog(self):
1645 super(AbortTask, self).epilog()
1646 self.queue_entry.set_status('Aborted')
1647 self.success = True
1648
1649
1650 def run(self):
1651 for agent in self.agents_to_abort:
1652 if (agent.active_task):
1653 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001654
1655
showard97aed502008-11-04 02:01:24 +00001656class FinalReparseTask(AgentTask):
showard97aed502008-11-04 02:01:24 +00001657 _num_running_parses = 0
1658
1659 def __init__(self, queue_entries):
1660 self._queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001661 # don't use _set_ids, since we don't want to set the host_ids
1662 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001663 self._parse_started = False
1664
1665 assert len(queue_entries) > 0
1666 queue_entry = queue_entries[0]
showard97aed502008-11-04 02:01:24 +00001667
showard170873e2009-01-07 00:22:26 +00001668 self._execution_tag = queue_entry.execution_tag()
1669 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1670 self._autoserv_monitor = PidfileRunMonitor()
1671 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1672 self._final_status = self._determine_final_status()
1673
showard97aed502008-11-04 02:01:24 +00001674 if _testing_mode:
1675 self.cmd = 'true'
showard170873e2009-01-07 00:22:26 +00001676 else:
1677 super(FinalReparseTask, self).__init__(
1678 cmd=self._generate_parse_command(),
1679 working_directory=self._execution_tag)
showard97aed502008-11-04 02:01:24 +00001680
showard170873e2009-01-07 00:22:26 +00001681 self.log_file = os.path.join(self._execution_tag, '.parse.log')
showard97aed502008-11-04 02:01:24 +00001682
1683
1684 @classmethod
1685 def _increment_running_parses(cls):
1686 cls._num_running_parses += 1
1687
1688
1689 @classmethod
1690 def _decrement_running_parses(cls):
1691 cls._num_running_parses -= 1
1692
1693
1694 @classmethod
1695 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001696 return (cls._num_running_parses <
1697 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001698
1699
showard170873e2009-01-07 00:22:26 +00001700 def _determine_final_status(self):
1701 # we'll use a PidfileRunMonitor to read the autoserv exit status
1702 if self._autoserv_monitor.exit_code() == 0:
1703 return models.HostQueueEntry.Status.COMPLETED
1704 return models.HostQueueEntry.Status.FAILED
1705
1706
showard97aed502008-11-04 02:01:24 +00001707 def prolog(self):
1708 super(FinalReparseTask, self).prolog()
1709 for queue_entry in self._queue_entries:
1710 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
1711
1712
1713 def epilog(self):
1714 super(FinalReparseTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001715 for queue_entry in self._queue_entries:
showard170873e2009-01-07 00:22:26 +00001716 queue_entry.set_status(self._final_status)
showard97aed502008-11-04 02:01:24 +00001717
1718
showard2bab8f42008-11-12 18:15:22 +00001719 def _generate_parse_command(self):
showard170873e2009-01-07 00:22:26 +00001720 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
1721 self._results_dir]
showard97aed502008-11-04 02:01:24 +00001722
1723
1724 def poll(self):
1725 # override poll to keep trying to start until the parse count goes down
1726 # and we can, at which point we revert to default behavior
1727 if self._parse_started:
1728 super(FinalReparseTask, self).poll()
1729 else:
1730 self._try_starting_parse()
1731
1732
1733 def run(self):
1734 # override run() to not actually run unless we can
1735 self._try_starting_parse()
1736
1737
1738 def _try_starting_parse(self):
1739 if not self._can_run_new_parse():
1740 return
showard170873e2009-01-07 00:22:26 +00001741
showard678df4f2009-02-04 21:36:39 +00001742 # make sure we actually have results to parse
showard35162b02009-03-03 02:17:30 +00001743 # this should never happen in normal operation
showard678df4f2009-02-04 21:36:39 +00001744 if not self._autoserv_monitor.has_process():
1745 email_manager.manager.enqueue_notify_email(
1746 'No results to parse',
1747 'No results to parse at %s' % self._autoserv_monitor.pidfile_id)
1748 self.finished(False)
1749 return
1750
showard97aed502008-11-04 02:01:24 +00001751 # actually run the parse command
showard170873e2009-01-07 00:22:26 +00001752 self.monitor = PidfileRunMonitor()
1753 self.monitor.run(self.cmd, self._working_directory,
1754 log_file=self.log_file,
1755 pidfile_name='.parser_execute',
1756 paired_with_pidfile=self._autoserv_monitor.pidfile_id)
1757
showard97aed502008-11-04 02:01:24 +00001758 self._increment_running_parses()
1759 self._parse_started = True
1760
1761
1762 def finished(self, success):
1763 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001764 if self._parse_started:
1765 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001766
1767
showardc9ae1782009-01-30 01:42:37 +00001768class SetEntryPendingTask(AgentTask):
1769 def __init__(self, queue_entry):
1770 super(SetEntryPendingTask, self).__init__(cmd='')
1771 self._queue_entry = queue_entry
1772 self._set_ids(queue_entries=[queue_entry])
1773
1774
1775 def run(self):
1776 agent = self._queue_entry.on_pending()
1777 if agent:
1778 self.agent.dispatcher.add_agent(agent)
1779 self.finished(True)
1780
1781
showarda3c58572009-03-12 20:36:59 +00001782class DBError(Exception):
1783 """Raised by the DBObject constructor when its select fails."""
1784
1785
mbligh36768f02008-02-22 18:28:33 +00001786class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001787 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001788
1789 # Subclasses MUST override these:
1790 _table_name = ''
1791 _fields = ()
1792
showarda3c58572009-03-12 20:36:59 +00001793 # A mapping from (type, id) to the instance of the object for that
1794 # particular id. This prevents us from creating new Job() and Host()
1795 # instances for every HostQueueEntry object that we instantiate as
1796 # multiple HQEs often share the same Job.
1797 _instances_by_type_and_id = weakref.WeakValueDictionary()
1798 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001799
showarda3c58572009-03-12 20:36:59 +00001800
1801 def __new__(cls, id=None, **kwargs):
1802 """
1803 Look to see if we already have an instance for this particular type
1804 and id. If so, use it instead of creating a duplicate instance.
1805 """
1806 if id is not None:
1807 instance = cls._instances_by_type_and_id.get((cls, id))
1808 if instance:
1809 return instance
1810 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1811
1812
1813 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001814 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001815 assert self._table_name, '_table_name must be defined in your class'
1816 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00001817 if not new_record:
1818 if self._initialized and not always_query:
1819 return # We've already been initialized.
1820 if id is None:
1821 id = row[0]
1822 # Tell future constructors to use us instead of re-querying while
1823 # this instance is still around.
1824 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00001825
showard6ae5ea92009-02-25 00:11:51 +00001826 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00001827
jadmanski0afbb632008-06-06 21:10:57 +00001828 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001829
jadmanski0afbb632008-06-06 21:10:57 +00001830 if row is None:
showardccbd6c52009-03-21 00:10:21 +00001831 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00001832
showarda3c58572009-03-12 20:36:59 +00001833 if self._initialized:
1834 differences = self._compare_fields_in_row(row)
1835 if differences:
showard7629f142009-03-27 21:02:02 +00001836 logging.warn(
1837 'initialized %s %s instance requery is updating: %s',
1838 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00001839 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00001840 self._initialized = True
1841
1842
1843 @classmethod
1844 def _clear_instance_cache(cls):
1845 """Used for testing, clear the internal instance cache."""
1846 cls._instances_by_type_and_id.clear()
1847
1848
showardccbd6c52009-03-21 00:10:21 +00001849 def _fetch_row_from_db(self, row_id):
1850 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1851 rows = _db.execute(sql, (row_id,))
1852 if not rows:
1853 raise DBError("row not found (table=%s, id=%s)"
1854 % (self.__table, id))
1855 return rows[0]
1856
1857
showarda3c58572009-03-12 20:36:59 +00001858 def _assert_row_length(self, row):
1859 assert len(row) == len(self._fields), (
1860 "table = %s, row = %s/%d, fields = %s/%d" % (
1861 self.__table, row, len(row), self._fields, len(self._fields)))
1862
1863
1864 def _compare_fields_in_row(self, row):
1865 """
1866 Given a row as returned by a SELECT query, compare it to our existing
1867 in memory fields.
1868
1869 @param row - A sequence of values corresponding to fields named in
1870 The class attribute _fields.
1871
1872 @returns A dictionary listing the differences keyed by field name
1873 containing tuples of (current_value, row_value).
1874 """
1875 self._assert_row_length(row)
1876 differences = {}
1877 for field, row_value in itertools.izip(self._fields, row):
1878 current_value = getattr(self, field)
1879 if current_value != row_value:
1880 differences[field] = (current_value, row_value)
1881 return differences
showard2bab8f42008-11-12 18:15:22 +00001882
1883
1884 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00001885 """
1886 Update our field attributes using a single row returned by SELECT.
1887
1888 @param row - A sequence of values corresponding to fields named in
1889 the class fields list.
1890 """
1891 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00001892
showard2bab8f42008-11-12 18:15:22 +00001893 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00001894 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00001895 setattr(self, field, value)
1896 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00001897
showard2bab8f42008-11-12 18:15:22 +00001898 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00001899
mblighe2586682008-02-29 22:45:46 +00001900
showardccbd6c52009-03-21 00:10:21 +00001901 def update_from_database(self):
1902 assert self.id is not None
1903 row = self._fetch_row_from_db(self.id)
1904 self._update_fields_from_row(row)
1905
1906
jadmanski0afbb632008-06-06 21:10:57 +00001907 def count(self, where, table = None):
1908 if not table:
1909 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001910
jadmanski0afbb632008-06-06 21:10:57 +00001911 rows = _db.execute("""
1912 SELECT count(*) FROM %s
1913 WHERE %s
1914 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001915
jadmanski0afbb632008-06-06 21:10:57 +00001916 assert len(rows) == 1
1917
1918 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001919
1920
mblighf8c624d2008-07-03 16:58:45 +00001921 def update_field(self, field, value, condition=''):
showard2bab8f42008-11-12 18:15:22 +00001922 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00001923
showard2bab8f42008-11-12 18:15:22 +00001924 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00001925 return
mbligh36768f02008-02-22 18:28:33 +00001926
mblighf8c624d2008-07-03 16:58:45 +00001927 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
1928 if condition:
1929 query += ' AND (%s)' % condition
jadmanski0afbb632008-06-06 21:10:57 +00001930 _db.execute(query, (value, self.id))
1931
showard2bab8f42008-11-12 18:15:22 +00001932 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00001933
1934
jadmanski0afbb632008-06-06 21:10:57 +00001935 def save(self):
1936 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00001937 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00001938 columns = ','.join([str(key) for key in keys])
1939 values = ['"%s"' % self.__dict__[key] for key in keys]
showard89f84db2009-03-12 20:39:13 +00001940 values_str = ','.join(values)
1941 query = ('INSERT INTO %s (%s) VALUES (%s)' %
1942 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00001943 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00001944 # Update our id to the one the database just assigned to us.
1945 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00001946
1947
jadmanski0afbb632008-06-06 21:10:57 +00001948 def delete(self):
showarda3c58572009-03-12 20:36:59 +00001949 self._instances_by_type_and_id.pop((type(self), id), None)
1950 self._initialized = False
1951 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00001952 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1953 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00001954
1955
showard63a34772008-08-18 19:32:50 +00001956 @staticmethod
1957 def _prefix_with(string, prefix):
1958 if string:
1959 string = prefix + string
1960 return string
1961
1962
jadmanski0afbb632008-06-06 21:10:57 +00001963 @classmethod
showard989f25d2008-10-01 11:38:11 +00001964 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00001965 """
1966 Construct instances of our class based on the given database query.
1967
1968 @yields One class instance for each row fetched.
1969 """
showard63a34772008-08-18 19:32:50 +00001970 order_by = cls._prefix_with(order_by, 'ORDER BY ')
1971 where = cls._prefix_with(where, 'WHERE ')
1972 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00001973 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00001974 'joins' : joins,
1975 'where' : where,
1976 'order_by' : order_by})
1977 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00001978 for row in rows:
1979 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00001980
mbligh36768f02008-02-22 18:28:33 +00001981
1982class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001983 _table_name = 'ineligible_host_queues'
1984 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00001985
1986
showard89f84db2009-03-12 20:39:13 +00001987class AtomicGroup(DBObject):
1988 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00001989 _fields = ('id', 'name', 'description', 'max_number_of_machines',
1990 'invalid')
showard89f84db2009-03-12 20:39:13 +00001991
1992
showard989f25d2008-10-01 11:38:11 +00001993class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001994 _table_name = 'labels'
1995 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00001996 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00001997
1998
mbligh36768f02008-02-22 18:28:33 +00001999class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002000 _table_name = 'hosts'
2001 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2002 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2003
2004
jadmanski0afbb632008-06-06 21:10:57 +00002005 def current_task(self):
2006 rows = _db.execute("""
2007 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2008 """, (self.id,))
2009
2010 if len(rows) == 0:
2011 return None
2012 else:
2013 assert len(rows) == 1
2014 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002015 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002016
2017
jadmanski0afbb632008-06-06 21:10:57 +00002018 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002019 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002020 if self.current_task():
2021 self.current_task().requeue()
2022
showard6ae5ea92009-02-25 00:11:51 +00002023
jadmanski0afbb632008-06-06 21:10:57 +00002024 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002025 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002026 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002027
2028
showard170873e2009-01-07 00:22:26 +00002029 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002030 """
showard170873e2009-01-07 00:22:26 +00002031 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002032 """
2033 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002034 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002035 FROM labels
2036 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002037 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002038 ORDER BY labels.name
2039 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002040 platform = None
2041 all_labels = []
2042 for label_name, is_platform in rows:
2043 if is_platform:
2044 platform = label_name
2045 all_labels.append(label_name)
2046 return platform, all_labels
2047
2048
2049 def reverify_tasks(self):
2050 cleanup_task = CleanupTask(host=self)
2051 verify_task = VerifyTask(host=self)
2052 # just to make sure this host does not get taken away
2053 self.set_status('Cleaning')
2054 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002055
2056
mbligh36768f02008-02-22 18:28:33 +00002057class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002058 _table_name = 'host_queue_entries'
2059 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002060 'active', 'complete', 'deleted', 'execution_subdir',
2061 'atomic_group_id')
showard6ae5ea92009-02-25 00:11:51 +00002062
2063
showarda3c58572009-03-12 20:36:59 +00002064 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002065 assert id or row
showarda3c58572009-03-12 20:36:59 +00002066 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002067 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002068
jadmanski0afbb632008-06-06 21:10:57 +00002069 if self.host_id:
2070 self.host = Host(self.host_id)
2071 else:
2072 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002073
showard170873e2009-01-07 00:22:26 +00002074 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002075 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002076
2077
showard89f84db2009-03-12 20:39:13 +00002078 @classmethod
2079 def clone(cls, template):
2080 """
2081 Creates a new row using the values from a template instance.
2082
2083 The new instance will not exist in the database or have a valid
2084 id attribute until its save() method is called.
2085 """
2086 assert isinstance(template, cls)
2087 new_row = [getattr(template, field) for field in cls._fields]
2088 clone = cls(row=new_row, new_record=True)
2089 clone.id = None
2090 return clone
2091
2092
showardc85c21b2008-11-24 22:17:37 +00002093 def _view_job_url(self):
2094 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2095
2096
jadmanski0afbb632008-06-06 21:10:57 +00002097 def set_host(self, host):
2098 if host:
2099 self.queue_log_record('Assigning host ' + host.hostname)
2100 self.update_field('host_id', host.id)
2101 self.update_field('active', True)
2102 self.block_host(host.id)
2103 else:
2104 self.queue_log_record('Releasing host')
2105 self.unblock_host(self.host.id)
2106 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002107
jadmanski0afbb632008-06-06 21:10:57 +00002108 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002109
2110
jadmanski0afbb632008-06-06 21:10:57 +00002111 def get_host(self):
2112 return self.host
mbligh36768f02008-02-22 18:28:33 +00002113
2114
jadmanski0afbb632008-06-06 21:10:57 +00002115 def queue_log_record(self, log_line):
2116 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002117 _drone_manager.write_lines_to_file(self.queue_log_path,
2118 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002119
2120
jadmanski0afbb632008-06-06 21:10:57 +00002121 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002122 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002123 row = [0, self.job.id, host_id]
2124 block = IneligibleHostQueue(row=row, new_record=True)
2125 block.save()
mblighe2586682008-02-29 22:45:46 +00002126
2127
jadmanski0afbb632008-06-06 21:10:57 +00002128 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002129 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002130 blocks = IneligibleHostQueue.fetch(
2131 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2132 for block in blocks:
2133 block.delete()
mblighe2586682008-02-29 22:45:46 +00002134
2135
showard2bab8f42008-11-12 18:15:22 +00002136 def set_execution_subdir(self, subdir=None):
2137 if subdir is None:
2138 assert self.get_host()
2139 subdir = self.get_host().hostname
2140 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002141
2142
showard6355f6b2008-12-05 18:52:13 +00002143 def _get_hostname(self):
2144 if self.host:
2145 return self.host.hostname
2146 return 'no host'
2147
2148
showard170873e2009-01-07 00:22:26 +00002149 def __str__(self):
2150 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2151
2152
jadmanski0afbb632008-06-06 21:10:57 +00002153 def set_status(self, status):
mblighf8c624d2008-07-03 16:58:45 +00002154 abort_statuses = ['Abort', 'Aborting', 'Aborted']
2155 if status not in abort_statuses:
2156 condition = ' AND '.join(['status <> "%s"' % x
2157 for x in abort_statuses])
2158 else:
2159 condition = ''
2160 self.update_field('status', status, condition=condition)
2161
showardb18134f2009-03-20 20:52:18 +00002162 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002163
showardc85c21b2008-11-24 22:17:37 +00002164 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002165 self.update_field('complete', False)
2166 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002167
jadmanski0afbb632008-06-06 21:10:57 +00002168 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showarde58e3f82008-11-20 19:04:59 +00002169 'Aborting']:
jadmanski0afbb632008-06-06 21:10:57 +00002170 self.update_field('complete', False)
2171 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002172
showardc85c21b2008-11-24 22:17:37 +00002173 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002174 self.update_field('complete', True)
2175 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002176
2177 should_email_status = (status.lower() in _notify_email_statuses or
2178 'all' in _notify_email_statuses)
2179 if should_email_status:
2180 self._email_on_status(status)
2181
2182 self._email_on_job_complete()
2183
2184
2185 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002186 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002187
2188 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2189 self.job.id, self.job.name, hostname, status)
2190 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2191 self.job.id, self.job.name, hostname, status,
2192 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002193 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002194
2195
2196 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002197 if not self.job.is_finished():
2198 return
showard542e8402008-09-19 20:16:18 +00002199
showardc85c21b2008-11-24 22:17:37 +00002200 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002201 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002202 for queue_entry in hosts_queue:
2203 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002204 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002205 queue_entry.status))
2206
2207 summary_text = "\n".join(summary_text)
2208 status_counts = models.Job.objects.get_status_counts(
2209 [self.job.id])[self.job.id]
2210 status = ', '.join('%d %s' % (count, status) for status, count
2211 in status_counts.iteritems())
2212
2213 subject = 'Autotest: Job ID: %s "%s" %s' % (
2214 self.job.id, self.job.name, status)
2215 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2216 self.job.id, self.job.name, status, self._view_job_url(),
2217 summary_text)
showard170873e2009-01-07 00:22:26 +00002218 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002219
2220
showard89f84db2009-03-12 20:39:13 +00002221 def run(self, assigned_host=None):
2222 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002223 assert assigned_host
2224 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002225 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002226
showardb18134f2009-03-20 20:52:18 +00002227 logging.info("%s/%s/%s scheduled on %s, status=%s",
2228 self.job.name, self.meta_host, self.atomic_group_id,
2229 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002230
jadmanski0afbb632008-06-06 21:10:57 +00002231 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002232
showard6ae5ea92009-02-25 00:11:51 +00002233
jadmanski0afbb632008-06-06 21:10:57 +00002234 def requeue(self):
2235 self.set_status('Queued')
showardde634ee2009-01-30 01:44:24 +00002236 # verify/cleanup failure sets the execution subdir, so reset it here
2237 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002238 if self.meta_host:
2239 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002240
2241
jadmanski0afbb632008-06-06 21:10:57 +00002242 def handle_host_failure(self):
2243 """\
2244 Called when this queue entry's host has failed verification and
2245 repair.
2246 """
2247 assert not self.meta_host
2248 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002249 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002250
2251
jadmanskif7fa2cc2008-10-01 14:13:23 +00002252 @property
2253 def aborted_by(self):
2254 self._load_abort_info()
2255 return self._aborted_by
2256
2257
2258 @property
2259 def aborted_on(self):
2260 self._load_abort_info()
2261 return self._aborted_on
2262
2263
2264 def _load_abort_info(self):
2265 """ Fetch info about who aborted the job. """
2266 if hasattr(self, "_aborted_by"):
2267 return
2268 rows = _db.execute("""
2269 SELECT users.login, aborted_host_queue_entries.aborted_on
2270 FROM aborted_host_queue_entries
2271 INNER JOIN users
2272 ON users.id = aborted_host_queue_entries.aborted_by_id
2273 WHERE aborted_host_queue_entries.queue_entry_id = %s
2274 """, (self.id,))
2275 if rows:
2276 self._aborted_by, self._aborted_on = rows[0]
2277 else:
2278 self._aborted_by = self._aborted_on = None
2279
2280
showardb2e2c322008-10-14 17:33:55 +00002281 def on_pending(self):
2282 """
2283 Called when an entry in a synchronous job has passed verify. If the
2284 job is ready to run, returns an agent to run the job. Returns None
2285 otherwise.
2286 """
2287 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002288 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002289 if self.job.is_ready():
2290 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002291 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002292 return None
2293
2294
showard170873e2009-01-07 00:22:26 +00002295 def abort(self, dispatcher, agents_to_abort=[]):
showard1be97432008-10-17 15:30:45 +00002296 host = self.get_host()
showard9d9ffd52008-11-09 23:14:35 +00002297 if self.active and host:
showard170873e2009-01-07 00:22:26 +00002298 dispatcher.add_agent(Agent(tasks=host.reverify_tasks()))
showard1be97432008-10-17 15:30:45 +00002299
showard170873e2009-01-07 00:22:26 +00002300 abort_task = AbortTask(self, agents_to_abort)
showard1be97432008-10-17 15:30:45 +00002301 self.set_status('Aborting')
showard170873e2009-01-07 00:22:26 +00002302 dispatcher.add_agent(Agent(tasks=[abort_task], num_processes=0))
2303
2304 def execution_tag(self):
2305 assert self.execution_subdir
2306 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002307
2308
mbligh36768f02008-02-22 18:28:33 +00002309class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002310 _table_name = 'jobs'
2311 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2312 'control_type', 'created_on', 'synch_count', 'timeout',
2313 'run_verify', 'email_list', 'reboot_before', 'reboot_after')
2314
2315
showarda3c58572009-03-12 20:36:59 +00002316 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002317 assert id or row
showarda3c58572009-03-12 20:36:59 +00002318 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002319
mblighe2586682008-02-29 22:45:46 +00002320
jadmanski0afbb632008-06-06 21:10:57 +00002321 def is_server_job(self):
2322 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002323
2324
showard170873e2009-01-07 00:22:26 +00002325 def tag(self):
2326 return "%s-%s" % (self.id, self.owner)
2327
2328
jadmanski0afbb632008-06-06 21:10:57 +00002329 def get_host_queue_entries(self):
2330 rows = _db.execute("""
2331 SELECT * FROM host_queue_entries
2332 WHERE job_id= %s
2333 """, (self.id,))
2334 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002335
jadmanski0afbb632008-06-06 21:10:57 +00002336 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002337
jadmanski0afbb632008-06-06 21:10:57 +00002338 return entries
mbligh36768f02008-02-22 18:28:33 +00002339
2340
jadmanski0afbb632008-06-06 21:10:57 +00002341 def set_status(self, status, update_queues=False):
2342 self.update_field('status',status)
2343
2344 if update_queues:
2345 for queue_entry in self.get_host_queue_entries():
2346 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002347
2348
jadmanski0afbb632008-06-06 21:10:57 +00002349 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002350 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2351 status='Pending')
2352 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002353
2354
jadmanski0afbb632008-06-06 21:10:57 +00002355 def num_machines(self, clause = None):
2356 sql = "job_id=%s" % self.id
2357 if clause:
2358 sql += " AND (%s)" % clause
2359 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002360
2361
jadmanski0afbb632008-06-06 21:10:57 +00002362 def num_queued(self):
2363 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002364
2365
jadmanski0afbb632008-06-06 21:10:57 +00002366 def num_active(self):
2367 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002368
2369
jadmanski0afbb632008-06-06 21:10:57 +00002370 def num_complete(self):
2371 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002372
2373
jadmanski0afbb632008-06-06 21:10:57 +00002374 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002375 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002376
mbligh36768f02008-02-22 18:28:33 +00002377
showard6bb7c292009-01-30 01:44:51 +00002378 def _not_yet_run_entries(self, include_verifying=True):
2379 statuses = [models.HostQueueEntry.Status.QUEUED,
2380 models.HostQueueEntry.Status.PENDING]
2381 if include_verifying:
2382 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2383 return models.HostQueueEntry.objects.filter(job=self.id,
2384 status__in=statuses)
2385
2386
2387 def _stop_all_entries(self):
2388 entries_to_stop = self._not_yet_run_entries(
2389 include_verifying=False)
2390 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002391 assert not child_entry.complete, (
2392 '%s status=%s, active=%s, complete=%s' %
2393 (child_entry.id, child_entry.status, child_entry.active,
2394 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002395 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2396 child_entry.host.status = models.Host.Status.READY
2397 child_entry.host.save()
2398 child_entry.status = models.HostQueueEntry.Status.STOPPED
2399 child_entry.save()
2400
showard2bab8f42008-11-12 18:15:22 +00002401 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002402 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002403 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002404 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002405
2406
jadmanski0afbb632008-06-06 21:10:57 +00002407 def write_to_machines_file(self, queue_entry):
2408 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002409 file_path = os.path.join(self.tag(), '.machines')
2410 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002411
2412
showard2bab8f42008-11-12 18:15:22 +00002413 def _next_group_name(self):
2414 query = models.HostQueueEntry.objects.filter(
2415 job=self.id).values('execution_subdir').distinct()
2416 subdirs = (entry['execution_subdir'] for entry in query)
2417 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2418 ids = [int(match.group(1)) for match in groups if match]
2419 if ids:
2420 next_id = max(ids) + 1
2421 else:
2422 next_id = 0
2423 return "group%d" % next_id
2424
2425
showard170873e2009-01-07 00:22:26 +00002426 def _write_control_file(self, execution_tag):
2427 control_path = _drone_manager.attach_file_to_execution(
2428 execution_tag, self.control_file)
2429 return control_path
mbligh36768f02008-02-22 18:28:33 +00002430
showardb2e2c322008-10-14 17:33:55 +00002431
showard2bab8f42008-11-12 18:15:22 +00002432 def get_group_entries(self, queue_entry_from_group):
2433 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002434 return list(HostQueueEntry.fetch(
2435 where='job_id=%s AND execution_subdir=%s',
2436 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002437
2438
showardb2e2c322008-10-14 17:33:55 +00002439 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002440 assert queue_entries
2441 execution_tag = queue_entries[0].execution_tag()
2442 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002443 hostnames = ','.join([entry.get_host().hostname
2444 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002445
showard170873e2009-01-07 00:22:26 +00002446 params = [_autoserv_path, '-P', execution_tag, '-p', '-n',
2447 '-r', _drone_manager.absolute_path(execution_tag),
2448 '-u', self.owner, '-l', self.name, '-m', hostnames,
2449 _drone_manager.absolute_path(control_path)]
mbligh36768f02008-02-22 18:28:33 +00002450
jadmanski0afbb632008-06-06 21:10:57 +00002451 if not self.is_server_job():
2452 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002453
showardb2e2c322008-10-14 17:33:55 +00002454 return params
mblighe2586682008-02-29 22:45:46 +00002455
mbligh36768f02008-02-22 18:28:33 +00002456
showardc9ae1782009-01-30 01:42:37 +00002457 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002458 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002459 return True
showard0fc38302008-10-23 00:44:07 +00002460 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002461 return queue_entry.get_host().dirty
2462 return False
showard21baa452008-10-21 00:08:39 +00002463
showardc9ae1782009-01-30 01:42:37 +00002464
2465 def _should_run_verify(self, queue_entry):
2466 do_not_verify = (queue_entry.host.protection ==
2467 host_protections.Protection.DO_NOT_VERIFY)
2468 if do_not_verify:
2469 return False
2470 return self.run_verify
2471
2472
2473 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002474 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002475 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002476 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002477 if self._should_run_verify(queue_entry):
2478 tasks.append(VerifyTask(queue_entry=queue_entry))
2479 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002480 return tasks
2481
2482
showard2bab8f42008-11-12 18:15:22 +00002483 def _assign_new_group(self, queue_entries):
2484 if len(queue_entries) == 1:
2485 group_name = queue_entries[0].get_host().hostname
2486 else:
2487 group_name = self._next_group_name()
showardb18134f2009-03-20 20:52:18 +00002488 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002489 self.id, [entry.host.hostname for entry in queue_entries],
2490 group_name)
2491
2492 for queue_entry in queue_entries:
2493 queue_entry.set_execution_subdir(group_name)
2494
2495
2496 def _choose_group_to_run(self, include_queue_entry):
2497 chosen_entries = [include_queue_entry]
2498
2499 num_entries_needed = self.synch_count - 1
2500 if num_entries_needed > 0:
2501 pending_entries = HostQueueEntry.fetch(
2502 where='job_id = %s AND status = "Pending" AND id != %s',
2503 params=(self.id, include_queue_entry.id))
2504 chosen_entries += list(pending_entries)[:num_entries_needed]
2505
2506 self._assign_new_group(chosen_entries)
2507 return chosen_entries
2508
2509
2510 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002511 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002512 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2513 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002514
showard2bab8f42008-11-12 18:15:22 +00002515 queue_entries = self._choose_group_to_run(queue_entry)
2516 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002517
2518
2519 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002520 for queue_entry in queue_entries:
2521 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002522 params = self._get_autoserv_params(queue_entries)
2523 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2524 cmd=params)
2525 tasks = initial_tasks + [queue_task]
2526 entry_ids = [entry.id for entry in queue_entries]
2527
showard170873e2009-01-07 00:22:26 +00002528 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002529
2530
mbligh36768f02008-02-22 18:28:33 +00002531if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002532 main()