blob: 94f58fa2943e67a6775741230a9fed4142d8b077 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard89f84db2009-03-12 20:39:13 +00008import datetime, errno, optparse, os, pwd, Queue, random, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000017from autotest_lib.frontend.afe import models
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
showardd1ee1dd2009-01-07 21:33:08 +000019from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000020
mblighb090f142008-02-27 21:33:46 +000021
mbligh36768f02008-02-22 18:28:33 +000022RESULTS_DIR = '.'
23AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000024DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000025
26AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
27
28if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000029 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000030AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
31AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
32
33if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000034 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000035
mbligh90a549d2008-03-25 23:52:34 +000036# how long to wait for autoserv to write a pidfile
37PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000038
showard35162b02009-03-03 02:17:30 +000039# error message to leave in results dir when an autoserv process disappears
40# mysteriously
41_LOST_PROCESS_ERROR = """\
42Autoserv failed abnormally during execution for this job, probably due to a
43system error on the Autotest server. Full results may not be available. Sorry.
44"""
45
mbligh6f8bab42008-02-29 22:45:14 +000046_db = None
mbligh36768f02008-02-22 18:28:33 +000047_shutdown = False
showard170873e2009-01-07 00:22:26 +000048_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
49_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000050_testing_mode = False
showard542e8402008-09-19 20:16:18 +000051_base_url = None
showardc85c21b2008-11-24 22:17:37 +000052_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000053_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000054
showardb18134f2009-03-20 20:52:18 +000055# load the logging settings
56scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
57os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
58# Here we export the log name, using the same convention as autoserv's results
59# directory.
60scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
61os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
62logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
63
mbligh36768f02008-02-22 18:28:33 +000064
65def main():
jadmanski0afbb632008-06-06 21:10:57 +000066 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000067
jadmanski0afbb632008-06-06 21:10:57 +000068 parser = optparse.OptionParser(usage)
69 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
70 action='store_true')
71 parser.add_option('--logfile', help='Set a log file that all stdout ' +
72 'should be redirected to. Stderr will go to this ' +
73 'file + ".err"')
74 parser.add_option('--test', help='Indicate that scheduler is under ' +
75 'test and should use dummy autoserv and no parsing',
76 action='store_true')
77 (options, args) = parser.parse_args()
78 if len(args) != 1:
79 parser.print_usage()
80 return
mbligh36768f02008-02-22 18:28:33 +000081
jadmanski0afbb632008-06-06 21:10:57 +000082 global RESULTS_DIR
83 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +000084
showardcca334f2009-03-12 20:38:34 +000085 # Change the cwd while running to avoid issues incase we were launched from
86 # somewhere odd (such as a random NFS home directory of the person running
87 # sudo to launch us as the appropriate user).
88 os.chdir(RESULTS_DIR)
89
jadmanski0afbb632008-06-06 21:10:57 +000090 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +000091 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
92 "notify_email_statuses",
93 default='')
showardc85c21b2008-11-24 22:17:37 +000094 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +000095 _notify_email_statuses = [status for status in
96 re.split(r'[\s,;:]', notify_statuses_list.lower())
97 if status]
showardc85c21b2008-11-24 22:17:37 +000098
jadmanski0afbb632008-06-06 21:10:57 +000099 if options.test:
100 global _autoserv_path
101 _autoserv_path = 'autoserv_dummy'
102 global _testing_mode
103 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000104
mbligh37eceaa2008-12-15 22:56:37 +0000105 # AUTOTEST_WEB.base_url is still a supported config option as some people
106 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000107 global _base_url
showard170873e2009-01-07 00:22:26 +0000108 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
109 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000110 if config_base_url:
111 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000112 else:
mbligh37eceaa2008-12-15 22:56:37 +0000113 # For the common case of everything running on a single server you
114 # can just set the hostname in a single place in the config file.
115 server_name = c.get_config_value('SERVER', 'hostname')
116 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000117 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000118 sys.exit(1)
119 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000120
showardc5afc462009-01-13 00:09:39 +0000121 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000122 server.start()
123
jadmanski0afbb632008-06-06 21:10:57 +0000124 try:
showardc5afc462009-01-13 00:09:39 +0000125 init(options.logfile)
126 dispatcher = Dispatcher()
127 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
128
jadmanski0afbb632008-06-06 21:10:57 +0000129 while not _shutdown:
130 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000131 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000132 except:
showard170873e2009-01-07 00:22:26 +0000133 email_manager.manager.log_stacktrace(
134 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000135
showard170873e2009-01-07 00:22:26 +0000136 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000137 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000138 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000139 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000140
141
142def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000143 global _shutdown
144 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000145 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000146
147
148def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000149 if logfile:
150 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000151 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
152 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000153
mblighfb676032009-04-01 18:25:38 +0000154 utils.write_pid("monitor_db")
155
showardb1e51872008-10-07 11:08:18 +0000156 if _testing_mode:
157 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000158 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000159
jadmanski0afbb632008-06-06 21:10:57 +0000160 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
161 global _db
showard170873e2009-01-07 00:22:26 +0000162 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000163 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000164
showardfa8629c2008-11-04 16:51:23 +0000165 # ensure Django connection is in autocommit
166 setup_django_environment.enable_autocommit()
167
showardb18134f2009-03-20 20:52:18 +0000168 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000169 signal.signal(signal.SIGINT, handle_sigint)
170
showardd1ee1dd2009-01-07 21:33:08 +0000171 drones = global_config.global_config.get_config_value(
172 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
173 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000174 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000175 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000176 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
177
showardb18134f2009-03-20 20:52:18 +0000178 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000179
180
181def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000182 out_file = logfile
183 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000184 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000185 out_fd = open(out_file, "a", buffering=0)
186 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000187
jadmanski0afbb632008-06-06 21:10:57 +0000188 os.dup2(out_fd.fileno(), sys.stdout.fileno())
189 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000190
jadmanski0afbb632008-06-06 21:10:57 +0000191 sys.stdout = out_fd
192 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000193
194
mblighd5c95802008-03-05 00:33:46 +0000195def queue_entries_to_abort():
jadmanski0afbb632008-06-06 21:10:57 +0000196 rows = _db.execute("""
197 SELECT * FROM host_queue_entries WHERE status='Abort';
198 """)
showard2bab8f42008-11-12 18:15:22 +0000199
jadmanski0afbb632008-06-06 21:10:57 +0000200 qe = [HostQueueEntry(row=i) for i in rows]
201 return qe
mbligh36768f02008-02-22 18:28:33 +0000202
showard7cf9a9b2008-05-15 21:15:52 +0000203
showard89f84db2009-03-12 20:39:13 +0000204class SchedulerError(Exception):
205 """Raised by HostScheduler when an inconsistent state occurs."""
206
207
showard63a34772008-08-18 19:32:50 +0000208class HostScheduler(object):
209 def _get_ready_hosts(self):
210 # avoid any host with a currently active queue entry against it
211 hosts = Host.fetch(
212 joins='LEFT JOIN host_queue_entries AS active_hqe '
213 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000214 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000215 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000216 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000217 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
218 return dict((host.id, host) for host in hosts)
219
220
221 @staticmethod
222 def _get_sql_id_list(id_list):
223 return ','.join(str(item_id) for item_id in id_list)
224
225
226 @classmethod
showard989f25d2008-10-01 11:38:11 +0000227 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000228 if not id_list:
229 return {}
showard63a34772008-08-18 19:32:50 +0000230 query %= cls._get_sql_id_list(id_list)
231 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000232 return cls._process_many2many_dict(rows, flip)
233
234
235 @staticmethod
236 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000237 result = {}
238 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000239 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000240 if flip:
241 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000242 result.setdefault(left_id, set()).add(right_id)
243 return result
244
245
246 @classmethod
247 def _get_job_acl_groups(cls, job_ids):
248 query = """
showardd9ac4452009-02-07 02:04:37 +0000249 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000250 FROM jobs
251 INNER JOIN users ON users.login = jobs.owner
252 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
253 WHERE jobs.id IN (%s)
254 """
255 return cls._get_many2many_dict(query, job_ids)
256
257
258 @classmethod
259 def _get_job_ineligible_hosts(cls, job_ids):
260 query = """
261 SELECT job_id, host_id
262 FROM ineligible_host_queues
263 WHERE job_id IN (%s)
264 """
265 return cls._get_many2many_dict(query, job_ids)
266
267
268 @classmethod
showard989f25d2008-10-01 11:38:11 +0000269 def _get_job_dependencies(cls, job_ids):
270 query = """
271 SELECT job_id, label_id
272 FROM jobs_dependency_labels
273 WHERE job_id IN (%s)
274 """
275 return cls._get_many2many_dict(query, job_ids)
276
277
278 @classmethod
showard63a34772008-08-18 19:32:50 +0000279 def _get_host_acls(cls, host_ids):
280 query = """
showardd9ac4452009-02-07 02:04:37 +0000281 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000282 FROM acl_groups_hosts
283 WHERE host_id IN (%s)
284 """
285 return cls._get_many2many_dict(query, host_ids)
286
287
288 @classmethod
289 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000290 if not host_ids:
291 return {}, {}
showard63a34772008-08-18 19:32:50 +0000292 query = """
293 SELECT label_id, host_id
294 FROM hosts_labels
295 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000296 """ % cls._get_sql_id_list(host_ids)
297 rows = _db.execute(query)
298 labels_to_hosts = cls._process_many2many_dict(rows)
299 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
300 return labels_to_hosts, hosts_to_labels
301
302
303 @classmethod
304 def _get_labels(cls):
305 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000306
307
308 def refresh(self, pending_queue_entries):
309 self._hosts_available = self._get_ready_hosts()
310
311 relevant_jobs = [queue_entry.job_id
312 for queue_entry in pending_queue_entries]
313 self._job_acls = self._get_job_acl_groups(relevant_jobs)
314 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000315 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000316
317 host_ids = self._hosts_available.keys()
318 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000319 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
320
321 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000322
323
324 def _is_acl_accessible(self, host_id, queue_entry):
325 job_acls = self._job_acls.get(queue_entry.job_id, set())
326 host_acls = self._host_acls.get(host_id, set())
327 return len(host_acls.intersection(job_acls)) > 0
328
329
showard989f25d2008-10-01 11:38:11 +0000330 def _check_job_dependencies(self, job_dependencies, host_labels):
331 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000332 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000333
334
335 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
336 queue_entry):
showardade14e22009-01-26 22:38:32 +0000337 if not queue_entry.meta_host:
338 # bypass only_if_needed labels when a specific host is selected
339 return True
340
showard989f25d2008-10-01 11:38:11 +0000341 for label_id in host_labels:
342 label = self._labels[label_id]
343 if not label.only_if_needed:
344 # we don't care about non-only_if_needed labels
345 continue
346 if queue_entry.meta_host == label_id:
347 # if the label was requested in a metahost it's OK
348 continue
349 if label_id not in job_dependencies:
350 return False
351 return True
352
353
showard89f84db2009-03-12 20:39:13 +0000354 def _check_atomic_group_labels(self, host_labels, queue_entry):
355 """
356 Determine if the given HostQueueEntry's atomic group settings are okay
357 to schedule on a host with the given labels.
358
359 @param host_labels - A list of label ids that the host has.
360 @param queue_entry - The HostQueueEntry being considered for the host.
361
362 @returns True if atomic group settings are okay, False otherwise.
363 """
364 return (self._get_host_atomic_group_id(host_labels) ==
365 queue_entry.atomic_group_id)
366
367
368 def _get_host_atomic_group_id(self, host_labels):
369 """
370 Return the atomic group label id for a host with the given set of
371 labels if any, or None otherwise. Raises an exception if more than
372 one atomic group are found in the set of labels.
373
374 @param host_labels - A list of label ids that the host has.
375
376 @returns The id of the atomic group found on a label in host_labels
377 or None if no atomic group label is found.
378 @raises SchedulerError - If more than one atomic group label is found.
379 """
380 atomic_ids = [self._labels[label_id].atomic_group_id
381 for label_id in host_labels
382 if self._labels[label_id].atomic_group_id is not None]
383 if not atomic_ids:
384 return None
385 if len(atomic_ids) > 1:
386 raise SchedulerError('More than one atomic label on host.')
387 return atomic_ids[0]
388
389
390 def _get_atomic_group_labels(self, atomic_group_id):
391 """
392 Lookup the label ids that an atomic_group is associated with.
393
394 @param atomic_group_id - The id of the AtomicGroup to look up.
395
396 @returns A generator yeilding Label ids for this atomic group.
397 """
398 return (id for id, label in self._labels.iteritems()
399 if label.atomic_group_id == atomic_group_id
400 and not label.invalid)
401
402
403 def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
404 """
405 @param group_hosts - A sequence of Host ids to test for usability
406 and eligibility against the Job associated with queue_entry.
407 @param queue_entry - The HostQueueEntry that these hosts are being
408 tested for eligibility against.
409
410 @returns A subset of group_hosts Host ids that are eligible for the
411 supplied queue_entry.
412 """
413 return set(host_id for host_id in group_hosts
414 if self._is_host_usable(host_id)
415 and self._is_host_eligible_for_job(host_id, queue_entry))
416
417
showard989f25d2008-10-01 11:38:11 +0000418 def _is_host_eligible_for_job(self, host_id, queue_entry):
419 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
420 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000421
showard89f84db2009-03-12 20:39:13 +0000422 return (self._is_acl_accessible(host_id, queue_entry) and
423 self._check_job_dependencies(job_dependencies, host_labels) and
424 self._check_only_if_needed_labels(
425 job_dependencies, host_labels, queue_entry) and
426 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000427
428
showard63a34772008-08-18 19:32:50 +0000429 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000430 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000431 return None
432 return self._hosts_available.pop(queue_entry.host_id, None)
433
434
435 def _is_host_usable(self, host_id):
436 if host_id not in self._hosts_available:
437 # host was already used during this scheduling cycle
438 return False
439 if self._hosts_available[host_id].invalid:
440 # Invalid hosts cannot be used for metahosts. They're included in
441 # the original query because they can be used by non-metahosts.
442 return False
443 return True
444
445
446 def _schedule_metahost(self, queue_entry):
447 label_id = queue_entry.meta_host
448 hosts_in_label = self._label_hosts.get(label_id, set())
449 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
450 set())
451
452 # must iterate over a copy so we can mutate the original while iterating
453 for host_id in list(hosts_in_label):
454 if not self._is_host_usable(host_id):
455 hosts_in_label.remove(host_id)
456 continue
457 if host_id in ineligible_host_ids:
458 continue
showard989f25d2008-10-01 11:38:11 +0000459 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000460 continue
461
showard89f84db2009-03-12 20:39:13 +0000462 # Remove the host from our cached internal state before returning
463 # the host object.
showard63a34772008-08-18 19:32:50 +0000464 hosts_in_label.remove(host_id)
465 return self._hosts_available.pop(host_id)
466 return None
467
468
469 def find_eligible_host(self, queue_entry):
470 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000471 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000472 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000473 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000474 return self._schedule_metahost(queue_entry)
475
476
showard89f84db2009-03-12 20:39:13 +0000477 def find_eligible_atomic_group(self, queue_entry):
478 """
479 Given an atomic group host queue entry, locate an appropriate group
480 of hosts for the associated job to run on.
481
482 The caller is responsible for creating new HQEs for the additional
483 hosts returned in order to run the actual job on them.
484
485 @returns A list of Host instances in a ready state to satisfy this
486 atomic group scheduling. Hosts will all belong to the same
487 atomic group label as specified by the queue_entry.
488 An empty list will be returned if no suitable atomic
489 group could be found.
490
491 TODO(gps): what is responsible for kicking off any attempted repairs on
492 a group of hosts? not this function, but something needs to. We do
493 not communicate that reason for returning [] outside of here...
494 For now, we'll just be unschedulable if enough hosts within one group
495 enter Repair Failed state.
496 """
497 assert queue_entry.atomic_group_id is not None
498 job = queue_entry.job
499 assert job.synch_count and job.synch_count > 0
500 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
501 if job.synch_count > atomic_group.max_number_of_machines:
502 # Such a Job and HostQueueEntry should never be possible to
503 # create using the frontend. Regardless, we can't process it.
504 # Abort it immediately and log an error on the scheduler.
505 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000506 logging.error(
507 'Error: job %d synch_count=%d > requested atomic_group %d '
508 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
509 job.id, job.synch_count, atomic_group.id,
510 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000511 return []
512 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
513 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
514 set())
515
516 # Look in each label associated with atomic_group until we find one with
517 # enough hosts to satisfy the job.
518 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
519 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
520 if queue_entry.meta_host is not None:
521 # If we have a metahost label, only allow its hosts.
522 group_hosts.intersection_update(hosts_in_label)
523 group_hosts -= ineligible_host_ids
524 eligible_hosts_in_group = self._get_eligible_hosts_in_group(
525 group_hosts, queue_entry)
526
527 # Job.synch_count is treated as "minimum synch count" when
528 # scheduling for an atomic group of hosts. The atomic group
529 # number of machines is the maximum to pick out of a single
530 # atomic group label for scheduling at one time.
531 min_hosts = job.synch_count
532 max_hosts = atomic_group.max_number_of_machines
533
534 if len(eligible_hosts_in_group) < min_hosts:
535 # Not enough eligible hosts in this atomic group label.
536 continue
537
538 # Limit ourselves to scheduling the atomic group size.
539 if len(eligible_hosts_in_group) > max_hosts:
540 eligible_hosts_in_group = random.sample(
541 eligible_hosts_in_group, max_hosts)
542
543 # Remove the selected hosts from our cached internal state
544 # of available hosts in order to return the Host objects.
545 host_list = []
546 for host_id in eligible_hosts_in_group:
547 hosts_in_label.discard(host_id)
548 host_list.append(self._hosts_available.pop(host_id))
549 return host_list
550
551 return []
552
553
showard170873e2009-01-07 00:22:26 +0000554class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000555 def __init__(self):
556 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000557 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000558 self._host_scheduler = HostScheduler()
showard170873e2009-01-07 00:22:26 +0000559 self._host_agents = {}
560 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000561
mbligh36768f02008-02-22 18:28:33 +0000562
jadmanski0afbb632008-06-06 21:10:57 +0000563 def do_initial_recovery(self, recover_hosts=True):
564 # always recover processes
565 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000566
jadmanski0afbb632008-06-06 21:10:57 +0000567 if recover_hosts:
568 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000569
570
jadmanski0afbb632008-06-06 21:10:57 +0000571 def tick(self):
showard170873e2009-01-07 00:22:26 +0000572 _drone_manager.refresh()
showarda3ab0d52008-11-03 19:03:47 +0000573 self._run_cleanup_maybe()
jadmanski0afbb632008-06-06 21:10:57 +0000574 self._find_aborting()
575 self._schedule_new_jobs()
576 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000577 _drone_manager.execute_actions()
578 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000579
showard97aed502008-11-04 02:01:24 +0000580
showarda3ab0d52008-11-03 19:03:47 +0000581 def _run_cleanup_maybe(self):
showardd1ee1dd2009-01-07 21:33:08 +0000582 should_cleanup = (self._last_clean_time +
583 scheduler_config.config.clean_interval * 60 <
584 time.time())
585 if should_cleanup:
showardb18134f2009-03-20 20:52:18 +0000586 logging.info('Running cleanup')
showarda3ab0d52008-11-03 19:03:47 +0000587 self._abort_timed_out_jobs()
588 self._abort_jobs_past_synch_start_timeout()
589 self._clear_inactive_blocks()
showardfa8629c2008-11-04 16:51:23 +0000590 self._check_for_db_inconsistencies()
showarda3ab0d52008-11-03 19:03:47 +0000591 self._last_clean_time = time.time()
592
mbligh36768f02008-02-22 18:28:33 +0000593
showard170873e2009-01-07 00:22:26 +0000594 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
595 for object_id in object_ids:
596 agent_dict.setdefault(object_id, set()).add(agent)
597
598
599 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
600 for object_id in object_ids:
601 assert object_id in agent_dict
602 agent_dict[object_id].remove(agent)
603
604
jadmanski0afbb632008-06-06 21:10:57 +0000605 def add_agent(self, agent):
606 self._agents.append(agent)
607 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000608 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
609 self._register_agent_for_ids(self._queue_entry_agents,
610 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000611
showard170873e2009-01-07 00:22:26 +0000612
613 def get_agents_for_entry(self, queue_entry):
614 """
615 Find agents corresponding to the specified queue_entry.
616 """
617 return self._queue_entry_agents.get(queue_entry.id, set())
618
619
620 def host_has_agent(self, host):
621 """
622 Determine if there is currently an Agent present using this host.
623 """
624 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000625
626
jadmanski0afbb632008-06-06 21:10:57 +0000627 def remove_agent(self, agent):
628 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000629 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
630 agent)
631 self._unregister_agent_for_ids(self._queue_entry_agents,
632 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000633
634
showard4c5374f2008-09-04 17:02:56 +0000635 def num_running_processes(self):
636 return sum(agent.num_processes for agent in self._agents
637 if agent.is_running())
mblighbb421852008-03-11 22:36:16 +0000638
639
showard170873e2009-01-07 00:22:26 +0000640 def _extract_execution_tag(self, command_line):
641 match = re.match(r'.* -P (\S+) ', command_line)
642 if not match:
643 return None
644 return match.group(1)
mblighbb421852008-03-11 22:36:16 +0000645
646
showard2bab8f42008-11-12 18:15:22 +0000647 def _recover_queue_entries(self, queue_entries, run_monitor):
648 assert len(queue_entries) > 0
showard2bab8f42008-11-12 18:15:22 +0000649 queue_task = RecoveryQueueTask(job=queue_entries[0].job,
650 queue_entries=queue_entries,
651 run_monitor=run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +0000652 self.add_agent(Agent(tasks=[queue_task],
showard170873e2009-01-07 00:22:26 +0000653 num_processes=len(queue_entries)))
mblighbb421852008-03-11 22:36:16 +0000654
655
jadmanski0afbb632008-06-06 21:10:57 +0000656 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000657 self._register_pidfiles()
658 _drone_manager.refresh()
659 self._recover_running_entries()
660 self._recover_aborting_entries()
661 self._requeue_other_active_entries()
662 self._recover_parsing_entries()
663 self._reverify_remaining_hosts()
664 # reinitialize drones after killing orphaned processes, since they can
665 # leave around files when they die
666 _drone_manager.execute_actions()
667 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000668
showard170873e2009-01-07 00:22:26 +0000669
670 def _register_pidfiles(self):
671 # during recovery we may need to read pidfiles for both running and
672 # parsing entries
673 queue_entries = HostQueueEntry.fetch(
674 where="status IN ('Running', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000675 for queue_entry in queue_entries:
showard170873e2009-01-07 00:22:26 +0000676 pidfile_id = _drone_manager.get_pidfile_id_from(
677 queue_entry.execution_tag())
678 _drone_manager.register_pidfile(pidfile_id)
679
680
681 def _recover_running_entries(self):
682 orphans = _drone_manager.get_orphaned_autoserv_processes()
683
684 queue_entries = HostQueueEntry.fetch(where="status = 'Running'")
685 requeue_entries = []
686 for queue_entry in queue_entries:
687 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000688 # synchronous job we've already recovered
689 continue
showard170873e2009-01-07 00:22:26 +0000690 execution_tag = queue_entry.execution_tag()
691 run_monitor = PidfileRunMonitor()
692 run_monitor.attach_to_existing_process(execution_tag)
693 if not run_monitor.has_process():
694 # autoserv apparently never got run, so let it get requeued
695 continue
showarde788ea62008-11-17 21:02:47 +0000696 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showardb18134f2009-03-20 20:52:18 +0000697 logging.info('Recovering %s (process %s)',
698 (', '.join(str(entry) for entry in queue_entries),
699 run_monitor.get_process()))
showard2bab8f42008-11-12 18:15:22 +0000700 self._recover_queue_entries(queue_entries, run_monitor)
showard170873e2009-01-07 00:22:26 +0000701 orphans.pop(execution_tag, None)
mbligh90a549d2008-03-25 23:52:34 +0000702
jadmanski0afbb632008-06-06 21:10:57 +0000703 # now kill any remaining autoserv processes
showard170873e2009-01-07 00:22:26 +0000704 for process in orphans.itervalues():
showardb18134f2009-03-20 20:52:18 +0000705 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000706 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000707
showard170873e2009-01-07 00:22:26 +0000708
709 def _recover_aborting_entries(self):
710 queue_entries = HostQueueEntry.fetch(
711 where='status IN ("Abort", "Aborting")')
jadmanski0afbb632008-06-06 21:10:57 +0000712 for queue_entry in queue_entries:
showardb18134f2009-03-20 20:52:18 +0000713 logging.info('Recovering aborting QE %s', queue_entry)
showard170873e2009-01-07 00:22:26 +0000714 agent = queue_entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000715
showard97aed502008-11-04 02:01:24 +0000716
showard170873e2009-01-07 00:22:26 +0000717 def _requeue_other_active_entries(self):
718 queue_entries = HostQueueEntry.fetch(
719 where='active AND NOT complete AND status != "Pending"')
720 for queue_entry in queue_entries:
721 if self.get_agents_for_entry(queue_entry):
722 # entry has already been recovered
723 continue
showardb18134f2009-03-20 20:52:18 +0000724 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
725 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000726 if queue_entry.host:
727 tasks = queue_entry.host.reverify_tasks()
728 self.add_agent(Agent(tasks))
729 agent = queue_entry.requeue()
730
731
732 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000733 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000734 self._reverify_hosts_where("""(status = 'Repairing' OR
735 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000736 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000737
showard170873e2009-01-07 00:22:26 +0000738 # recover "Running" hosts with no active queue entries, although this
739 # should never happen
740 message = ('Recovering running host %s - this probably indicates a '
741 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000742 self._reverify_hosts_where("""status = 'Running' AND
743 id NOT IN (SELECT host_id
744 FROM host_queue_entries
745 WHERE active)""",
746 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000747
748
jadmanski0afbb632008-06-06 21:10:57 +0000749 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000750 print_message='Reverifying host %s'):
751 full_where='locked = 0 AND invalid = 0 AND ' + where
752 for host in Host.fetch(where=full_where):
753 if self.host_has_agent(host):
754 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000755 continue
showard170873e2009-01-07 00:22:26 +0000756 if print_message:
showardb18134f2009-03-20 20:52:18 +0000757 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000758 tasks = host.reverify_tasks()
759 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000760
761
showard97aed502008-11-04 02:01:24 +0000762 def _recover_parsing_entries(self):
showard2bab8f42008-11-12 18:15:22 +0000763 recovered_entry_ids = set()
showard97aed502008-11-04 02:01:24 +0000764 for entry in HostQueueEntry.fetch(where='status = "Parsing"'):
showard2bab8f42008-11-12 18:15:22 +0000765 if entry.id in recovered_entry_ids:
766 continue
767 queue_entries = entry.job.get_group_entries(entry)
showard170873e2009-01-07 00:22:26 +0000768 recovered_entry_ids = recovered_entry_ids.union(
769 entry.id for entry in queue_entries)
showardb18134f2009-03-20 20:52:18 +0000770 logging.info('Recovering parsing entries %s',
771 (', '.join(str(entry) for entry in queue_entries)))
showard97aed502008-11-04 02:01:24 +0000772
773 reparse_task = FinalReparseTask(queue_entries)
showard170873e2009-01-07 00:22:26 +0000774 self.add_agent(Agent([reparse_task], num_processes=0))
showard97aed502008-11-04 02:01:24 +0000775
776
jadmanski0afbb632008-06-06 21:10:57 +0000777 def _recover_hosts(self):
778 # recover "Repair Failed" hosts
779 message = 'Reverifying dead host %s'
780 self._reverify_hosts_where("status = 'Repair Failed'",
781 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000782
783
showard3bb499f2008-07-03 19:42:20 +0000784 def _abort_timed_out_jobs(self):
785 """
786 Aborts all jobs that have timed out and not completed
787 """
showarda3ab0d52008-11-03 19:03:47 +0000788 query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
789 where=['created_on + INTERVAL timeout HOUR < NOW()'])
790 for job in query.distinct():
showardb18134f2009-03-20 20:52:18 +0000791 logging.warning('Aborting job %d due to job timeout', job.id)
showarda3ab0d52008-11-03 19:03:47 +0000792 job.abort(None)
showard3bb499f2008-07-03 19:42:20 +0000793
794
showard98863972008-10-29 21:14:56 +0000795 def _abort_jobs_past_synch_start_timeout(self):
796 """
797 Abort synchronous jobs that are past the start timeout (from global
798 config) and are holding a machine that's in everyone.
799 """
800 timeout_delta = datetime.timedelta(
showardd1ee1dd2009-01-07 21:33:08 +0000801 minutes=scheduler_config.config.synch_job_start_timeout_minutes)
showard98863972008-10-29 21:14:56 +0000802 timeout_start = datetime.datetime.now() - timeout_delta
803 query = models.Job.objects.filter(
showard98863972008-10-29 21:14:56 +0000804 created_on__lt=timeout_start,
805 hostqueueentry__status='Pending',
showardd9ac4452009-02-07 02:04:37 +0000806 hostqueueentry__host__aclgroup__name='Everyone')
showard98863972008-10-29 21:14:56 +0000807 for job in query.distinct():
showardb18134f2009-03-20 20:52:18 +0000808 logging.warning('Aborting job %d due to start timeout', job.id)
showardff059d72008-12-03 18:18:53 +0000809 entries_to_abort = job.hostqueueentry_set.exclude(
810 status=models.HostQueueEntry.Status.RUNNING)
811 for queue_entry in entries_to_abort:
812 queue_entry.abort(None)
showard98863972008-10-29 21:14:56 +0000813
814
jadmanski0afbb632008-06-06 21:10:57 +0000815 def _clear_inactive_blocks(self):
816 """
817 Clear out blocks for all completed jobs.
818 """
819 # this would be simpler using NOT IN (subquery), but MySQL
820 # treats all IN subqueries as dependent, so this optimizes much
821 # better
822 _db.execute("""
823 DELETE ihq FROM ineligible_host_queues ihq
showard4eaaf522008-06-06 22:28:07 +0000824 LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries
jadmanski0afbb632008-06-06 21:10:57 +0000825 WHERE NOT complete) hqe
826 USING (job_id) WHERE hqe.job_id IS NULL""")
showard04c82c52008-05-29 19:38:12 +0000827
828
showardb95b1bd2008-08-15 18:11:04 +0000829 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000830 # prioritize by job priority, then non-metahost over metahost, then FIFO
831 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000832 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000833 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000834 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000835
836
showard89f84db2009-03-12 20:39:13 +0000837 def _refresh_pending_queue_entries(self):
838 """
839 Lookup the pending HostQueueEntries and call our HostScheduler
840 refresh() method given that list. Return the list.
841
842 @returns A list of pending HostQueueEntries sorted in priority order.
843 """
showard63a34772008-08-18 19:32:50 +0000844 queue_entries = self._get_pending_queue_entries()
845 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000846 return []
showardb95b1bd2008-08-15 18:11:04 +0000847
showard63a34772008-08-18 19:32:50 +0000848 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000849
showard89f84db2009-03-12 20:39:13 +0000850 return queue_entries
851
852
853 def _schedule_atomic_group(self, queue_entry):
854 """
855 Schedule the given queue_entry on an atomic group of hosts.
856
857 Returns immediately if there are insufficient available hosts.
858
859 Creates new HostQueueEntries based off of queue_entry for the
860 scheduled hosts and starts them all running.
861 """
862 # This is a virtual host queue entry representing an entire
863 # atomic group, find a group and schedule their hosts.
864 group_hosts = self._host_scheduler.find_eligible_atomic_group(
865 queue_entry)
866 if not group_hosts:
867 return
868 # The first assigned host uses the original HostQueueEntry
869 group_queue_entries = [queue_entry]
870 for assigned_host in group_hosts[1:]:
871 # Create a new HQE for every additional assigned_host.
872 new_hqe = HostQueueEntry.clone(queue_entry)
873 new_hqe.save()
874 group_queue_entries.append(new_hqe)
875 assert len(group_queue_entries) == len(group_hosts)
876 for queue_entry, host in itertools.izip(group_queue_entries,
877 group_hosts):
878 self._run_queue_entry(queue_entry, host)
879
880
881 def _schedule_new_jobs(self):
882 queue_entries = self._refresh_pending_queue_entries()
883 if not queue_entries:
884 return
885
showard63a34772008-08-18 19:32:50 +0000886 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000887 if (queue_entry.atomic_group_id is None or
888 queue_entry.host_id is not None):
889 assigned_host = self._host_scheduler.find_eligible_host(
890 queue_entry)
891 if assigned_host:
892 self._run_queue_entry(queue_entry, assigned_host)
893 else:
894 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000895
896
897 def _run_queue_entry(self, queue_entry, host):
898 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000899 # in some cases (synchronous jobs with run_verify=False), agent may be
900 # None
showard9976ce92008-10-15 20:28:13 +0000901 if agent:
902 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000903
904
jadmanski0afbb632008-06-06 21:10:57 +0000905 def _find_aborting(self):
jadmanski0afbb632008-06-06 21:10:57 +0000906 for entry in queue_entries_to_abort():
showard170873e2009-01-07 00:22:26 +0000907 agents_to_abort = list(self.get_agents_for_entry(entry))
showard1be97432008-10-17 15:30:45 +0000908 for agent in agents_to_abort:
909 self.remove_agent(agent)
910
showard170873e2009-01-07 00:22:26 +0000911 entry.abort(self, agents_to_abort)
jadmanski0afbb632008-06-06 21:10:57 +0000912
913
showard324bf812009-01-20 23:23:38 +0000914 def _can_start_agent(self, agent, num_started_this_cycle,
915 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000916 # always allow zero-process agents to run
917 if agent.num_processes == 0:
918 return True
919 # don't allow any nonzero-process agents to run after we've reached a
920 # limit (this avoids starvation of many-process agents)
921 if have_reached_limit:
922 return False
923 # total process throttling
showard324bf812009-01-20 23:23:38 +0000924 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000925 return False
926 # if a single agent exceeds the per-cycle throttling, still allow it to
927 # run when it's the first agent in the cycle
928 if num_started_this_cycle == 0:
929 return True
930 # per-cycle throttling
931 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000932 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000933 return False
934 return True
935
936
jadmanski0afbb632008-06-06 21:10:57 +0000937 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000938 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000939 have_reached_limit = False
940 # iterate over copy, so we can remove agents during iteration
941 for agent in list(self._agents):
942 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000943 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000944 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000945 continue
946 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000947 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000948 have_reached_limit):
949 have_reached_limit = True
950 continue
showard4c5374f2008-09-04 17:02:56 +0000951 num_started_this_cycle += agent.num_processes
952 agent.tick()
showardb18134f2009-03-20 20:52:18 +0000953 logging.info('%d running processes',
954 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000955
956
showardfa8629c2008-11-04 16:51:23 +0000957 def _check_for_db_inconsistencies(self):
958 query = models.HostQueueEntry.objects.filter(active=True, complete=True)
959 if query.count() != 0:
960 subject = ('%d queue entries found with active=complete=1'
961 % query.count())
962 message = '\n'.join(str(entry.get_object_dict())
963 for entry in query[:50])
964 if len(query) > 50:
965 message += '\n(truncated)\n'
966
showardb18134f2009-03-20 20:52:18 +0000967 logging.error(subject)
showard170873e2009-01-07 00:22:26 +0000968 email_manager.manager.enqueue_notify_email(subject, message)
showardfa8629c2008-11-04 16:51:23 +0000969
970
showard170873e2009-01-07 00:22:26 +0000971class PidfileRunMonitor(object):
972 """
973 Client must call either run() to start a new process or
974 attach_to_existing_process().
975 """
mbligh36768f02008-02-22 18:28:33 +0000976
showard170873e2009-01-07 00:22:26 +0000977 class _PidfileException(Exception):
978 """
979 Raised when there's some unexpected behavior with the pid file, but only
980 used internally (never allowed to escape this class).
981 """
mbligh36768f02008-02-22 18:28:33 +0000982
983
showard170873e2009-01-07 00:22:26 +0000984 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000985 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000986 self._start_time = None
987 self.pidfile_id = None
988 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000989
990
showard170873e2009-01-07 00:22:26 +0000991 def _add_nice_command(self, command, nice_level):
992 if not nice_level:
993 return command
994 return ['nice', '-n', str(nice_level)] + command
995
996
997 def _set_start_time(self):
998 self._start_time = time.time()
999
1000
1001 def run(self, command, working_directory, nice_level=None, log_file=None,
1002 pidfile_name=None, paired_with_pidfile=None):
1003 assert command is not None
1004 if nice_level is not None:
1005 command = ['nice', '-n', str(nice_level)] + command
1006 self._set_start_time()
1007 self.pidfile_id = _drone_manager.execute_command(
1008 command, working_directory, log_file=log_file,
1009 pidfile_name=pidfile_name, paired_with_pidfile=paired_with_pidfile)
1010
1011
1012 def attach_to_existing_process(self, execution_tag):
1013 self._set_start_time()
1014 self.pidfile_id = _drone_manager.get_pidfile_id_from(execution_tag)
1015 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001016
1017
jadmanski0afbb632008-06-06 21:10:57 +00001018 def kill(self):
showard170873e2009-01-07 00:22:26 +00001019 if self.has_process():
1020 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001021
mbligh36768f02008-02-22 18:28:33 +00001022
showard170873e2009-01-07 00:22:26 +00001023 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001024 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001025 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001026
1027
showard170873e2009-01-07 00:22:26 +00001028 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001029 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001030 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001031 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001032
1033
showard170873e2009-01-07 00:22:26 +00001034 def _read_pidfile(self, use_second_read=False):
1035 assert self.pidfile_id is not None, (
1036 'You must call run() or attach_to_existing_process()')
1037 contents = _drone_manager.get_pidfile_contents(
1038 self.pidfile_id, use_second_read=use_second_read)
1039 if contents.is_invalid():
1040 self._state = drone_manager.PidfileContents()
1041 raise self._PidfileException(contents)
1042 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001043
1044
showard21baa452008-10-21 00:08:39 +00001045 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001046 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1047 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001048 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001049 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001050 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001051
1052
1053 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001054 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001055 return
mblighbb421852008-03-11 22:36:16 +00001056
showard21baa452008-10-21 00:08:39 +00001057 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001058
showard170873e2009-01-07 00:22:26 +00001059 if self._state.process is None:
1060 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001061 return
mbligh90a549d2008-03-25 23:52:34 +00001062
showard21baa452008-10-21 00:08:39 +00001063 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001064 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001065 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001066 return
mbligh90a549d2008-03-25 23:52:34 +00001067
showard170873e2009-01-07 00:22:26 +00001068 # pid but no running process - maybe process *just* exited
1069 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001070 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001071 # autoserv exited without writing an exit code
1072 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001073 self._handle_pidfile_error(
1074 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001075
showard21baa452008-10-21 00:08:39 +00001076
1077 def _get_pidfile_info(self):
1078 """\
1079 After completion, self._state will contain:
1080 pid=None, exit_status=None if autoserv has not yet run
1081 pid!=None, exit_status=None if autoserv is running
1082 pid!=None, exit_status!=None if autoserv has completed
1083 """
1084 try:
1085 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001086 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001087 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001088
1089
showard170873e2009-01-07 00:22:26 +00001090 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001091 """\
1092 Called when no pidfile is found or no pid is in the pidfile.
1093 """
showard170873e2009-01-07 00:22:26 +00001094 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001095 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001096 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1097 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001098 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001099 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001100
1101
showard35162b02009-03-03 02:17:30 +00001102 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001103 """\
1104 Called when autoserv has exited without writing an exit status,
1105 or we've timed out waiting for autoserv to write a pid to the
1106 pidfile. In either case, we just return failure and the caller
1107 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001108
showard170873e2009-01-07 00:22:26 +00001109 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001110 """
1111 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001112 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001113 self._state.exit_status = 1
1114 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001115
1116
jadmanski0afbb632008-06-06 21:10:57 +00001117 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001118 self._get_pidfile_info()
1119 return self._state.exit_status
1120
1121
1122 def num_tests_failed(self):
1123 self._get_pidfile_info()
1124 assert self._state.num_tests_failed is not None
1125 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001126
1127
mbligh36768f02008-02-22 18:28:33 +00001128class Agent(object):
showard170873e2009-01-07 00:22:26 +00001129 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001130 self.active_task = None
1131 self.queue = Queue.Queue(0)
1132 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001133 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001134
showard170873e2009-01-07 00:22:26 +00001135 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1136 for task in tasks)
1137 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1138
jadmanski0afbb632008-06-06 21:10:57 +00001139 for task in tasks:
1140 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001141
1142
showard170873e2009-01-07 00:22:26 +00001143 def _union_ids(self, id_lists):
1144 return set(itertools.chain(*id_lists))
1145
1146
jadmanski0afbb632008-06-06 21:10:57 +00001147 def add_task(self, task):
1148 self.queue.put_nowait(task)
1149 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001150
1151
jadmanski0afbb632008-06-06 21:10:57 +00001152 def tick(self):
showard21baa452008-10-21 00:08:39 +00001153 while not self.is_done():
1154 if self.active_task and not self.active_task.is_done():
1155 self.active_task.poll()
1156 if not self.active_task.is_done():
1157 return
1158 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001159
1160
jadmanski0afbb632008-06-06 21:10:57 +00001161 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001162 logging.info("agent picking task")
jadmanski0afbb632008-06-06 21:10:57 +00001163 if self.active_task:
1164 assert self.active_task.is_done()
mbligh36768f02008-02-22 18:28:33 +00001165
jadmanski0afbb632008-06-06 21:10:57 +00001166 if not self.active_task.success:
1167 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001168
jadmanski0afbb632008-06-06 21:10:57 +00001169 self.active_task = None
1170 if not self.is_done():
1171 self.active_task = self.queue.get_nowait()
1172 if self.active_task:
1173 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001174
1175
jadmanski0afbb632008-06-06 21:10:57 +00001176 def on_task_failure(self):
1177 self.queue = Queue.Queue(0)
showardccbd6c52009-03-21 00:10:21 +00001178 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1179 # get reset.
1180 new_agent = Agent(self.active_task.failure_tasks)
1181 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001182
mblighe2586682008-02-29 22:45:46 +00001183
showard4c5374f2008-09-04 17:02:56 +00001184 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001185 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001186
1187
jadmanski0afbb632008-06-06 21:10:57 +00001188 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001189 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001190
1191
jadmanski0afbb632008-06-06 21:10:57 +00001192 def start(self):
1193 assert self.dispatcher
mbligh36768f02008-02-22 18:28:33 +00001194
jadmanski0afbb632008-06-06 21:10:57 +00001195 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001196
jadmanski0afbb632008-06-06 21:10:57 +00001197
mbligh36768f02008-02-22 18:28:33 +00001198class AgentTask(object):
showard170873e2009-01-07 00:22:26 +00001199 def __init__(self, cmd, working_directory=None, failure_tasks=[]):
jadmanski0afbb632008-06-06 21:10:57 +00001200 self.done = False
1201 self.failure_tasks = failure_tasks
1202 self.started = False
1203 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001204 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001205 self.task = None
1206 self.agent = None
1207 self.monitor = None
1208 self.success = None
showard170873e2009-01-07 00:22:26 +00001209 self.queue_entry_ids = []
1210 self.host_ids = []
1211 self.log_file = None
1212
1213
1214 def _set_ids(self, host=None, queue_entries=None):
1215 if queue_entries and queue_entries != [None]:
1216 self.host_ids = [entry.host.id for entry in queue_entries]
1217 self.queue_entry_ids = [entry.id for entry in queue_entries]
1218 else:
1219 assert host
1220 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001221
1222
jadmanski0afbb632008-06-06 21:10:57 +00001223 def poll(self):
jadmanski0afbb632008-06-06 21:10:57 +00001224 if self.monitor:
1225 self.tick(self.monitor.exit_code())
1226 else:
1227 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001228
1229
jadmanski0afbb632008-06-06 21:10:57 +00001230 def tick(self, exit_code):
showard170873e2009-01-07 00:22:26 +00001231 if exit_code is None:
jadmanski0afbb632008-06-06 21:10:57 +00001232 return
jadmanski0afbb632008-06-06 21:10:57 +00001233 if exit_code == 0:
1234 success = True
1235 else:
1236 success = False
mbligh36768f02008-02-22 18:28:33 +00001237
jadmanski0afbb632008-06-06 21:10:57 +00001238 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001239
1240
jadmanski0afbb632008-06-06 21:10:57 +00001241 def is_done(self):
1242 return self.done
mbligh36768f02008-02-22 18:28:33 +00001243
1244
jadmanski0afbb632008-06-06 21:10:57 +00001245 def finished(self, success):
1246 self.done = True
1247 self.success = success
1248 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001249
1250
jadmanski0afbb632008-06-06 21:10:57 +00001251 def prolog(self):
1252 pass
mblighd64e5702008-04-04 21:39:28 +00001253
1254
jadmanski0afbb632008-06-06 21:10:57 +00001255 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001256 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001257
mbligh36768f02008-02-22 18:28:33 +00001258
jadmanski0afbb632008-06-06 21:10:57 +00001259 def cleanup(self):
showard170873e2009-01-07 00:22:26 +00001260 if self.monitor and self.log_file:
1261 _drone_manager.copy_to_results_repository(
1262 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001263
1264
jadmanski0afbb632008-06-06 21:10:57 +00001265 def epilog(self):
1266 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001267
1268
jadmanski0afbb632008-06-06 21:10:57 +00001269 def start(self):
1270 assert self.agent
1271
1272 if not self.started:
1273 self.prolog()
1274 self.run()
1275
1276 self.started = True
1277
1278
1279 def abort(self):
1280 if self.monitor:
1281 self.monitor.kill()
1282 self.done = True
1283 self.cleanup()
1284
1285
showard170873e2009-01-07 00:22:26 +00001286 def set_host_log_file(self, base_name, host):
1287 filename = '%s.%s' % (time.time(), base_name)
1288 self.log_file = os.path.join('hosts', host.hostname, filename)
1289
1290
showardde634ee2009-01-30 01:44:24 +00001291 def _get_consistent_execution_tag(self, queue_entries):
1292 first_execution_tag = queue_entries[0].execution_tag()
1293 for queue_entry in queue_entries[1:]:
1294 assert queue_entry.execution_tag() == first_execution_tag, (
1295 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1296 queue_entry,
1297 first_execution_tag,
1298 queue_entries[0]))
1299 return first_execution_tag
1300
1301
showard678df4f2009-02-04 21:36:39 +00001302 def _copy_and_parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001303 assert len(queue_entries) > 0
1304 assert self.monitor
1305 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001306 results_path = execution_tag + '/'
1307 _drone_manager.copy_to_results_repository(self.monitor.get_process(),
1308 results_path)
showardde634ee2009-01-30 01:44:24 +00001309
1310 reparse_task = FinalReparseTask(queue_entries)
1311 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1312
1313
jadmanski0afbb632008-06-06 21:10:57 +00001314 def run(self):
1315 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001316 self.monitor = PidfileRunMonitor()
1317 self.monitor.run(self.cmd, self._working_directory,
1318 nice_level=AUTOSERV_NICE_LEVEL,
1319 log_file=self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001320
1321
1322class RepairTask(AgentTask):
showarde788ea62008-11-17 21:02:47 +00001323 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001324 """\
showard170873e2009-01-07 00:22:26 +00001325 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001326 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001327 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001328 # normalize the protection name
1329 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001330
jadmanski0afbb632008-06-06 21:10:57 +00001331 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001332 self.queue_entry_to_fail = queue_entry
1333 # *don't* include the queue entry in IDs -- if the queue entry is
1334 # aborted, we want to leave the repair task running
1335 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001336
1337 self.create_temp_resultsdir('.repair')
1338 cmd = [_autoserv_path , '-p', '-R', '-m', host.hostname,
1339 '-r', _drone_manager.absolute_path(self.temp_results_dir),
1340 '--host-protection', protection]
1341 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1342
showard170873e2009-01-07 00:22:26 +00001343 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001344
mbligh36768f02008-02-22 18:28:33 +00001345
jadmanski0afbb632008-06-06 21:10:57 +00001346 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001347 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001348 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001349 if self.queue_entry_to_fail:
1350 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001351
1352
showardde634ee2009-01-30 01:44:24 +00001353 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001354 assert self.queue_entry_to_fail
1355
1356 if self.queue_entry_to_fail.meta_host:
1357 return # don't fail metahost entries, they'll be reassigned
1358
1359 self.queue_entry_to_fail.update_from_database()
1360 if self.queue_entry_to_fail.status != 'Queued':
1361 return # entry has been aborted
1362
1363 self.queue_entry_to_fail.set_execution_subdir()
showard678df4f2009-02-04 21:36:39 +00001364 # copy results logs into the normal place for job results
1365 _drone_manager.copy_results_on_drone(
1366 self.monitor.get_process(),
1367 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001368 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001369
showardccbd6c52009-03-21 00:10:21 +00001370 self._copy_and_parse_results([self.queue_entry_to_fail])
1371 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001372
1373
jadmanski0afbb632008-06-06 21:10:57 +00001374 def epilog(self):
1375 super(RepairTask, self).epilog()
1376 if self.success:
1377 self.host.set_status('Ready')
1378 else:
1379 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001380 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001381 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001382
1383
showard8fe93b52008-11-18 17:53:22 +00001384class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001385 def epilog(self):
1386 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001387 should_copy_results = (self.queue_entry and not self.success
1388 and not self.queue_entry.meta_host)
1389 if should_copy_results:
1390 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001391 destination = os.path.join(self.queue_entry.execution_tag(),
1392 os.path.basename(self.log_file))
1393 _drone_manager.copy_to_results_repository(
1394 self.monitor.get_process(), self.log_file,
1395 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001396
1397
1398class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001399 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001400 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001401 self.host = host or queue_entry.host
1402 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001403
jadmanski0afbb632008-06-06 21:10:57 +00001404 self.create_temp_resultsdir('.verify')
showard170873e2009-01-07 00:22:26 +00001405 cmd = [_autoserv_path, '-p', '-v', '-m', self.host.hostname, '-r',
1406 _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001407 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001408 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1409 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001410
showard170873e2009-01-07 00:22:26 +00001411 self.set_host_log_file('verify', self.host)
1412 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001413
1414
jadmanski0afbb632008-06-06 21:10:57 +00001415 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001416 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001417 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001418 if self.queue_entry:
1419 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001420 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001421
1422
jadmanski0afbb632008-06-06 21:10:57 +00001423 def epilog(self):
1424 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001425
jadmanski0afbb632008-06-06 21:10:57 +00001426 if self.success:
1427 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001428
1429
mbligh36768f02008-02-22 18:28:33 +00001430class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001431 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001432 self.job = job
1433 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001434 super(QueueTask, self).__init__(cmd, self._execution_tag())
1435 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001436
1437
showard170873e2009-01-07 00:22:26 +00001438 def _format_keyval(self, key, value):
1439 return '%s=%s' % (key, value)
mbligh36768f02008-02-22 18:28:33 +00001440
1441
showard73ec0442009-02-07 02:05:20 +00001442 def _keyval_path(self):
1443 return os.path.join(self._execution_tag(), 'keyval')
1444
1445
1446 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1447 keyval_contents = '\n'.join(self._format_keyval(key, value)
1448 for key, value in keyval_dict.iteritems())
1449 # always end with a newline to allow additional keyvals to be written
1450 keyval_contents += '\n'
1451 _drone_manager.attach_file_to_execution(self._execution_tag(),
1452 keyval_contents,
1453 file_path=keyval_path)
1454
1455
1456 def _write_keyvals_before_job(self, keyval_dict):
1457 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1458
1459
1460 def _write_keyval_after_job(self, field, value):
showard170873e2009-01-07 00:22:26 +00001461 assert self.monitor and self.monitor.has_process()
showard170873e2009-01-07 00:22:26 +00001462 _drone_manager.write_lines_to_file(
showard73ec0442009-02-07 02:05:20 +00001463 self._keyval_path(), [self._format_keyval(field, value)],
showard35162b02009-03-03 02:17:30 +00001464 paired_with_process=self.monitor.get_process())
showardd8e548a2008-09-09 03:04:57 +00001465
1466
showard170873e2009-01-07 00:22:26 +00001467 def _write_host_keyvals(self, host):
1468 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1469 host.hostname)
1470 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001471 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1472 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001473
1474
showard170873e2009-01-07 00:22:26 +00001475 def _execution_tag(self):
1476 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001477
1478
jadmanski0afbb632008-06-06 21:10:57 +00001479 def prolog(self):
showard73ec0442009-02-07 02:05:20 +00001480 queued = int(time.mktime(self.job.created_on.timetuple()))
1481 self._write_keyvals_before_job({'job_queued': queued})
jadmanski0afbb632008-06-06 21:10:57 +00001482 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001483 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001484 queue_entry.set_status('Running')
1485 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001486 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001487 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001488 assert len(self.queue_entries) == 1
1489 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001490
1491
showard35162b02009-03-03 02:17:30 +00001492 def _write_lost_process_error_file(self):
1493 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1494 _drone_manager.write_lines_to_file(error_file_path,
1495 [_LOST_PROCESS_ERROR])
1496
1497
showard97aed502008-11-04 02:01:24 +00001498 def _finish_task(self, success):
showard35162b02009-03-03 02:17:30 +00001499 if self.monitor.has_process():
1500 self._write_keyval_after_job("job_finished", int(time.time()))
1501 self._copy_and_parse_results(self.queue_entries)
1502
1503 if self.monitor.lost_process:
1504 self._write_lost_process_error_file()
1505 for queue_entry in self.queue_entries:
1506 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001507
1508
showardcbd74612008-11-19 21:42:02 +00001509 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001510 _drone_manager.write_lines_to_file(
1511 os.path.join(self._execution_tag(), 'status.log'),
1512 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001513 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001514
1515
jadmanskif7fa2cc2008-10-01 14:13:23 +00001516 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001517 if not self.monitor or not self.monitor.has_process():
1518 return
1519
jadmanskif7fa2cc2008-10-01 14:13:23 +00001520 # build up sets of all the aborted_by and aborted_on values
1521 aborted_by, aborted_on = set(), set()
1522 for queue_entry in self.queue_entries:
1523 if queue_entry.aborted_by:
1524 aborted_by.add(queue_entry.aborted_by)
1525 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1526 aborted_on.add(t)
1527
1528 # extract some actual, unique aborted by value and write it out
1529 assert len(aborted_by) <= 1
1530 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001531 aborted_by_value = aborted_by.pop()
1532 aborted_on_value = max(aborted_on)
1533 else:
1534 aborted_by_value = 'autotest_system'
1535 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001536
showarda0382352009-02-11 23:36:43 +00001537 self._write_keyval_after_job("aborted_by", aborted_by_value)
1538 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001539
showardcbd74612008-11-19 21:42:02 +00001540 aborted_on_string = str(datetime.datetime.fromtimestamp(
1541 aborted_on_value))
1542 self._write_status_comment('Job aborted by %s on %s' %
1543 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001544
1545
jadmanski0afbb632008-06-06 21:10:57 +00001546 def abort(self):
1547 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001548 self._log_abort()
showard97aed502008-11-04 02:01:24 +00001549 self._finish_task(False)
jadmanskic2ac77f2008-05-16 21:44:04 +00001550
1551
showard21baa452008-10-21 00:08:39 +00001552 def _reboot_hosts(self):
1553 reboot_after = self.job.reboot_after
1554 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00001555 if reboot_after == models.RebootAfter.ALWAYS:
showard21baa452008-10-21 00:08:39 +00001556 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00001557 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
showard21baa452008-10-21 00:08:39 +00001558 num_tests_failed = self.monitor.num_tests_failed()
1559 do_reboot = (self.success and num_tests_failed == 0)
1560
showard8ebca792008-11-04 21:54:22 +00001561 for queue_entry in self.queue_entries:
1562 if do_reboot:
showard45ae8192008-11-05 19:32:53 +00001563 # don't pass the queue entry to the CleanupTask. if the cleanup
showardfa8629c2008-11-04 16:51:23 +00001564 # fails, the job doesn't care -- it's over.
showard45ae8192008-11-05 19:32:53 +00001565 cleanup_task = CleanupTask(host=queue_entry.get_host())
1566 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
showard8ebca792008-11-04 21:54:22 +00001567 else:
1568 queue_entry.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001569
1570
jadmanski0afbb632008-06-06 21:10:57 +00001571 def epilog(self):
1572 super(QueueTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001573 self._finish_task(self.success)
showard21baa452008-10-21 00:08:39 +00001574 self._reboot_hosts()
mblighbb421852008-03-11 22:36:16 +00001575
showardb18134f2009-03-20 20:52:18 +00001576 logging.info("queue_task finished with succes=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001577
1578
mblighbb421852008-03-11 22:36:16 +00001579class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001580 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001581 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001582 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001583
1584
jadmanski0afbb632008-06-06 21:10:57 +00001585 def run(self):
1586 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001587
1588
jadmanski0afbb632008-06-06 21:10:57 +00001589 def prolog(self):
1590 # recovering an existing process - don't do prolog
1591 pass
mblighbb421852008-03-11 22:36:16 +00001592
1593
showard8fe93b52008-11-18 17:53:22 +00001594class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001595 def __init__(self, host=None, queue_entry=None):
1596 assert bool(host) ^ bool(queue_entry)
1597 if queue_entry:
1598 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001599 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001600 self.host = host
showard170873e2009-01-07 00:22:26 +00001601
1602 self.create_temp_resultsdir('.cleanup')
1603 self.cmd = [_autoserv_path, '-p', '--cleanup', '-m', host.hostname,
1604 '-r', _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001605 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001606 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1607 failure_tasks=[repair_task])
1608
1609 self._set_ids(host=host, queue_entries=[queue_entry])
1610 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001611
mblighd5c95802008-03-05 00:33:46 +00001612
jadmanski0afbb632008-06-06 21:10:57 +00001613 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001614 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001615 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001616 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001617
mblighd5c95802008-03-05 00:33:46 +00001618
showard21baa452008-10-21 00:08:39 +00001619 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001620 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001621 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001622 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001623 self.host.update_field('dirty', 0)
1624
1625
mblighd5c95802008-03-05 00:33:46 +00001626class AbortTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001627 def __init__(self, queue_entry, agents_to_abort):
jadmanski0afbb632008-06-06 21:10:57 +00001628 super(AbortTask, self).__init__('')
showard170873e2009-01-07 00:22:26 +00001629 self.queue_entry = queue_entry
1630 # don't use _set_ids, since we don't want to set the host_ids
1631 self.queue_entry_ids = [queue_entry.id]
1632 self.agents_to_abort = agents_to_abort
mbligh36768f02008-02-22 18:28:33 +00001633
1634
jadmanski0afbb632008-06-06 21:10:57 +00001635 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001636 logging.info("starting abort on host %s, job %s",
1637 self.queue_entry.host_id, self.queue_entry.job_id)
mbligh36768f02008-02-22 18:28:33 +00001638
mblighd64e5702008-04-04 21:39:28 +00001639
jadmanski0afbb632008-06-06 21:10:57 +00001640 def epilog(self):
1641 super(AbortTask, self).epilog()
1642 self.queue_entry.set_status('Aborted')
1643 self.success = True
1644
1645
1646 def run(self):
1647 for agent in self.agents_to_abort:
1648 if (agent.active_task):
1649 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001650
1651
showard97aed502008-11-04 02:01:24 +00001652class FinalReparseTask(AgentTask):
showard97aed502008-11-04 02:01:24 +00001653 _num_running_parses = 0
1654
1655 def __init__(self, queue_entries):
1656 self._queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001657 # don't use _set_ids, since we don't want to set the host_ids
1658 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001659 self._parse_started = False
1660
1661 assert len(queue_entries) > 0
1662 queue_entry = queue_entries[0]
showard97aed502008-11-04 02:01:24 +00001663
showard170873e2009-01-07 00:22:26 +00001664 self._execution_tag = queue_entry.execution_tag()
1665 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1666 self._autoserv_monitor = PidfileRunMonitor()
1667 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1668 self._final_status = self._determine_final_status()
1669
showard97aed502008-11-04 02:01:24 +00001670 if _testing_mode:
1671 self.cmd = 'true'
showard170873e2009-01-07 00:22:26 +00001672 else:
1673 super(FinalReparseTask, self).__init__(
1674 cmd=self._generate_parse_command(),
1675 working_directory=self._execution_tag)
showard97aed502008-11-04 02:01:24 +00001676
showard170873e2009-01-07 00:22:26 +00001677 self.log_file = os.path.join(self._execution_tag, '.parse.log')
showard97aed502008-11-04 02:01:24 +00001678
1679
1680 @classmethod
1681 def _increment_running_parses(cls):
1682 cls._num_running_parses += 1
1683
1684
1685 @classmethod
1686 def _decrement_running_parses(cls):
1687 cls._num_running_parses -= 1
1688
1689
1690 @classmethod
1691 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001692 return (cls._num_running_parses <
1693 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001694
1695
showard170873e2009-01-07 00:22:26 +00001696 def _determine_final_status(self):
1697 # we'll use a PidfileRunMonitor to read the autoserv exit status
1698 if self._autoserv_monitor.exit_code() == 0:
1699 return models.HostQueueEntry.Status.COMPLETED
1700 return models.HostQueueEntry.Status.FAILED
1701
1702
showard97aed502008-11-04 02:01:24 +00001703 def prolog(self):
1704 super(FinalReparseTask, self).prolog()
1705 for queue_entry in self._queue_entries:
1706 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
1707
1708
1709 def epilog(self):
1710 super(FinalReparseTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001711 for queue_entry in self._queue_entries:
showard170873e2009-01-07 00:22:26 +00001712 queue_entry.set_status(self._final_status)
showard97aed502008-11-04 02:01:24 +00001713
1714
showard2bab8f42008-11-12 18:15:22 +00001715 def _generate_parse_command(self):
showard170873e2009-01-07 00:22:26 +00001716 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
1717 self._results_dir]
showard97aed502008-11-04 02:01:24 +00001718
1719
1720 def poll(self):
1721 # override poll to keep trying to start until the parse count goes down
1722 # and we can, at which point we revert to default behavior
1723 if self._parse_started:
1724 super(FinalReparseTask, self).poll()
1725 else:
1726 self._try_starting_parse()
1727
1728
1729 def run(self):
1730 # override run() to not actually run unless we can
1731 self._try_starting_parse()
1732
1733
1734 def _try_starting_parse(self):
1735 if not self._can_run_new_parse():
1736 return
showard170873e2009-01-07 00:22:26 +00001737
showard678df4f2009-02-04 21:36:39 +00001738 # make sure we actually have results to parse
showard35162b02009-03-03 02:17:30 +00001739 # this should never happen in normal operation
showard678df4f2009-02-04 21:36:39 +00001740 if not self._autoserv_monitor.has_process():
1741 email_manager.manager.enqueue_notify_email(
1742 'No results to parse',
1743 'No results to parse at %s' % self._autoserv_monitor.pidfile_id)
1744 self.finished(False)
1745 return
1746
showard97aed502008-11-04 02:01:24 +00001747 # actually run the parse command
showard170873e2009-01-07 00:22:26 +00001748 self.monitor = PidfileRunMonitor()
1749 self.monitor.run(self.cmd, self._working_directory,
1750 log_file=self.log_file,
1751 pidfile_name='.parser_execute',
1752 paired_with_pidfile=self._autoserv_monitor.pidfile_id)
1753
showard97aed502008-11-04 02:01:24 +00001754 self._increment_running_parses()
1755 self._parse_started = True
1756
1757
1758 def finished(self, success):
1759 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001760 if self._parse_started:
1761 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001762
1763
showardc9ae1782009-01-30 01:42:37 +00001764class SetEntryPendingTask(AgentTask):
1765 def __init__(self, queue_entry):
1766 super(SetEntryPendingTask, self).__init__(cmd='')
1767 self._queue_entry = queue_entry
1768 self._set_ids(queue_entries=[queue_entry])
1769
1770
1771 def run(self):
1772 agent = self._queue_entry.on_pending()
1773 if agent:
1774 self.agent.dispatcher.add_agent(agent)
1775 self.finished(True)
1776
1777
showarda3c58572009-03-12 20:36:59 +00001778class DBError(Exception):
1779 """Raised by the DBObject constructor when its select fails."""
1780
1781
mbligh36768f02008-02-22 18:28:33 +00001782class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001783 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001784
1785 # Subclasses MUST override these:
1786 _table_name = ''
1787 _fields = ()
1788
showarda3c58572009-03-12 20:36:59 +00001789 # A mapping from (type, id) to the instance of the object for that
1790 # particular id. This prevents us from creating new Job() and Host()
1791 # instances for every HostQueueEntry object that we instantiate as
1792 # multiple HQEs often share the same Job.
1793 _instances_by_type_and_id = weakref.WeakValueDictionary()
1794 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001795
showarda3c58572009-03-12 20:36:59 +00001796
1797 def __new__(cls, id=None, **kwargs):
1798 """
1799 Look to see if we already have an instance for this particular type
1800 and id. If so, use it instead of creating a duplicate instance.
1801 """
1802 if id is not None:
1803 instance = cls._instances_by_type_and_id.get((cls, id))
1804 if instance:
1805 return instance
1806 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1807
1808
1809 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001810 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001811 assert self._table_name, '_table_name must be defined in your class'
1812 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00001813 if not new_record:
1814 if self._initialized and not always_query:
1815 return # We've already been initialized.
1816 if id is None:
1817 id = row[0]
1818 # Tell future constructors to use us instead of re-querying while
1819 # this instance is still around.
1820 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00001821
showard6ae5ea92009-02-25 00:11:51 +00001822 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00001823
jadmanski0afbb632008-06-06 21:10:57 +00001824 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001825
jadmanski0afbb632008-06-06 21:10:57 +00001826 if row is None:
showardccbd6c52009-03-21 00:10:21 +00001827 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00001828
showarda3c58572009-03-12 20:36:59 +00001829 if self._initialized:
1830 differences = self._compare_fields_in_row(row)
1831 if differences:
showard7629f142009-03-27 21:02:02 +00001832 logging.warn(
1833 'initialized %s %s instance requery is updating: %s',
1834 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00001835 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00001836 self._initialized = True
1837
1838
1839 @classmethod
1840 def _clear_instance_cache(cls):
1841 """Used for testing, clear the internal instance cache."""
1842 cls._instances_by_type_and_id.clear()
1843
1844
showardccbd6c52009-03-21 00:10:21 +00001845 def _fetch_row_from_db(self, row_id):
1846 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1847 rows = _db.execute(sql, (row_id,))
1848 if not rows:
1849 raise DBError("row not found (table=%s, id=%s)"
1850 % (self.__table, id))
1851 return rows[0]
1852
1853
showarda3c58572009-03-12 20:36:59 +00001854 def _assert_row_length(self, row):
1855 assert len(row) == len(self._fields), (
1856 "table = %s, row = %s/%d, fields = %s/%d" % (
1857 self.__table, row, len(row), self._fields, len(self._fields)))
1858
1859
1860 def _compare_fields_in_row(self, row):
1861 """
1862 Given a row as returned by a SELECT query, compare it to our existing
1863 in memory fields.
1864
1865 @param row - A sequence of values corresponding to fields named in
1866 The class attribute _fields.
1867
1868 @returns A dictionary listing the differences keyed by field name
1869 containing tuples of (current_value, row_value).
1870 """
1871 self._assert_row_length(row)
1872 differences = {}
1873 for field, row_value in itertools.izip(self._fields, row):
1874 current_value = getattr(self, field)
1875 if current_value != row_value:
1876 differences[field] = (current_value, row_value)
1877 return differences
showard2bab8f42008-11-12 18:15:22 +00001878
1879
1880 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00001881 """
1882 Update our field attributes using a single row returned by SELECT.
1883
1884 @param row - A sequence of values corresponding to fields named in
1885 the class fields list.
1886 """
1887 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00001888
showard2bab8f42008-11-12 18:15:22 +00001889 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00001890 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00001891 setattr(self, field, value)
1892 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00001893
showard2bab8f42008-11-12 18:15:22 +00001894 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00001895
mblighe2586682008-02-29 22:45:46 +00001896
showardccbd6c52009-03-21 00:10:21 +00001897 def update_from_database(self):
1898 assert self.id is not None
1899 row = self._fetch_row_from_db(self.id)
1900 self._update_fields_from_row(row)
1901
1902
jadmanski0afbb632008-06-06 21:10:57 +00001903 def count(self, where, table = None):
1904 if not table:
1905 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001906
jadmanski0afbb632008-06-06 21:10:57 +00001907 rows = _db.execute("""
1908 SELECT count(*) FROM %s
1909 WHERE %s
1910 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001911
jadmanski0afbb632008-06-06 21:10:57 +00001912 assert len(rows) == 1
1913
1914 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001915
1916
mblighf8c624d2008-07-03 16:58:45 +00001917 def update_field(self, field, value, condition=''):
showard2bab8f42008-11-12 18:15:22 +00001918 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00001919
showard2bab8f42008-11-12 18:15:22 +00001920 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00001921 return
mbligh36768f02008-02-22 18:28:33 +00001922
mblighf8c624d2008-07-03 16:58:45 +00001923 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
1924 if condition:
1925 query += ' AND (%s)' % condition
jadmanski0afbb632008-06-06 21:10:57 +00001926 _db.execute(query, (value, self.id))
1927
showard2bab8f42008-11-12 18:15:22 +00001928 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00001929
1930
jadmanski0afbb632008-06-06 21:10:57 +00001931 def save(self):
1932 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00001933 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00001934 columns = ','.join([str(key) for key in keys])
1935 values = ['"%s"' % self.__dict__[key] for key in keys]
showard89f84db2009-03-12 20:39:13 +00001936 values_str = ','.join(values)
1937 query = ('INSERT INTO %s (%s) VALUES (%s)' %
1938 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00001939 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00001940 # Update our id to the one the database just assigned to us.
1941 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00001942
1943
jadmanski0afbb632008-06-06 21:10:57 +00001944 def delete(self):
showarda3c58572009-03-12 20:36:59 +00001945 self._instances_by_type_and_id.pop((type(self), id), None)
1946 self._initialized = False
1947 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00001948 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1949 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00001950
1951
showard63a34772008-08-18 19:32:50 +00001952 @staticmethod
1953 def _prefix_with(string, prefix):
1954 if string:
1955 string = prefix + string
1956 return string
1957
1958
jadmanski0afbb632008-06-06 21:10:57 +00001959 @classmethod
showard989f25d2008-10-01 11:38:11 +00001960 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00001961 """
1962 Construct instances of our class based on the given database query.
1963
1964 @yields One class instance for each row fetched.
1965 """
showard63a34772008-08-18 19:32:50 +00001966 order_by = cls._prefix_with(order_by, 'ORDER BY ')
1967 where = cls._prefix_with(where, 'WHERE ')
1968 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00001969 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00001970 'joins' : joins,
1971 'where' : where,
1972 'order_by' : order_by})
1973 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00001974 for row in rows:
1975 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00001976
mbligh36768f02008-02-22 18:28:33 +00001977
1978class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001979 _table_name = 'ineligible_host_queues'
1980 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00001981
1982
showard89f84db2009-03-12 20:39:13 +00001983class AtomicGroup(DBObject):
1984 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00001985 _fields = ('id', 'name', 'description', 'max_number_of_machines',
1986 'invalid')
showard89f84db2009-03-12 20:39:13 +00001987
1988
showard989f25d2008-10-01 11:38:11 +00001989class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001990 _table_name = 'labels'
1991 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00001992 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00001993
1994
mbligh36768f02008-02-22 18:28:33 +00001995class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001996 _table_name = 'hosts'
1997 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
1998 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
1999
2000
jadmanski0afbb632008-06-06 21:10:57 +00002001 def current_task(self):
2002 rows = _db.execute("""
2003 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2004 """, (self.id,))
2005
2006 if len(rows) == 0:
2007 return None
2008 else:
2009 assert len(rows) == 1
2010 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002011 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002012
2013
jadmanski0afbb632008-06-06 21:10:57 +00002014 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002015 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002016 if self.current_task():
2017 self.current_task().requeue()
2018
showard6ae5ea92009-02-25 00:11:51 +00002019
jadmanski0afbb632008-06-06 21:10:57 +00002020 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002021 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002022 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002023
2024
showard170873e2009-01-07 00:22:26 +00002025 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002026 """
showard170873e2009-01-07 00:22:26 +00002027 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002028 """
2029 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002030 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002031 FROM labels
2032 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002033 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002034 ORDER BY labels.name
2035 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002036 platform = None
2037 all_labels = []
2038 for label_name, is_platform in rows:
2039 if is_platform:
2040 platform = label_name
2041 all_labels.append(label_name)
2042 return platform, all_labels
2043
2044
2045 def reverify_tasks(self):
2046 cleanup_task = CleanupTask(host=self)
2047 verify_task = VerifyTask(host=self)
2048 # just to make sure this host does not get taken away
2049 self.set_status('Cleaning')
2050 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002051
2052
mbligh36768f02008-02-22 18:28:33 +00002053class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002054 _table_name = 'host_queue_entries'
2055 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002056 'active', 'complete', 'deleted', 'execution_subdir',
2057 'atomic_group_id')
showard6ae5ea92009-02-25 00:11:51 +00002058
2059
showarda3c58572009-03-12 20:36:59 +00002060 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002061 assert id or row
showarda3c58572009-03-12 20:36:59 +00002062 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002063 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002064
jadmanski0afbb632008-06-06 21:10:57 +00002065 if self.host_id:
2066 self.host = Host(self.host_id)
2067 else:
2068 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002069
showard170873e2009-01-07 00:22:26 +00002070 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002071 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002072
2073
showard89f84db2009-03-12 20:39:13 +00002074 @classmethod
2075 def clone(cls, template):
2076 """
2077 Creates a new row using the values from a template instance.
2078
2079 The new instance will not exist in the database or have a valid
2080 id attribute until its save() method is called.
2081 """
2082 assert isinstance(template, cls)
2083 new_row = [getattr(template, field) for field in cls._fields]
2084 clone = cls(row=new_row, new_record=True)
2085 clone.id = None
2086 return clone
2087
2088
showardc85c21b2008-11-24 22:17:37 +00002089 def _view_job_url(self):
2090 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2091
2092
jadmanski0afbb632008-06-06 21:10:57 +00002093 def set_host(self, host):
2094 if host:
2095 self.queue_log_record('Assigning host ' + host.hostname)
2096 self.update_field('host_id', host.id)
2097 self.update_field('active', True)
2098 self.block_host(host.id)
2099 else:
2100 self.queue_log_record('Releasing host')
2101 self.unblock_host(self.host.id)
2102 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002103
jadmanski0afbb632008-06-06 21:10:57 +00002104 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002105
2106
jadmanski0afbb632008-06-06 21:10:57 +00002107 def get_host(self):
2108 return self.host
mbligh36768f02008-02-22 18:28:33 +00002109
2110
jadmanski0afbb632008-06-06 21:10:57 +00002111 def queue_log_record(self, log_line):
2112 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002113 _drone_manager.write_lines_to_file(self.queue_log_path,
2114 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002115
2116
jadmanski0afbb632008-06-06 21:10:57 +00002117 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002118 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002119 row = [0, self.job.id, host_id]
2120 block = IneligibleHostQueue(row=row, new_record=True)
2121 block.save()
mblighe2586682008-02-29 22:45:46 +00002122
2123
jadmanski0afbb632008-06-06 21:10:57 +00002124 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002125 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002126 blocks = IneligibleHostQueue.fetch(
2127 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2128 for block in blocks:
2129 block.delete()
mblighe2586682008-02-29 22:45:46 +00002130
2131
showard2bab8f42008-11-12 18:15:22 +00002132 def set_execution_subdir(self, subdir=None):
2133 if subdir is None:
2134 assert self.get_host()
2135 subdir = self.get_host().hostname
2136 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002137
2138
showard6355f6b2008-12-05 18:52:13 +00002139 def _get_hostname(self):
2140 if self.host:
2141 return self.host.hostname
2142 return 'no host'
2143
2144
showard170873e2009-01-07 00:22:26 +00002145 def __str__(self):
2146 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2147
2148
jadmanski0afbb632008-06-06 21:10:57 +00002149 def set_status(self, status):
mblighf8c624d2008-07-03 16:58:45 +00002150 abort_statuses = ['Abort', 'Aborting', 'Aborted']
2151 if status not in abort_statuses:
2152 condition = ' AND '.join(['status <> "%s"' % x
2153 for x in abort_statuses])
2154 else:
2155 condition = ''
2156 self.update_field('status', status, condition=condition)
2157
showardb18134f2009-03-20 20:52:18 +00002158 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002159
showardc85c21b2008-11-24 22:17:37 +00002160 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002161 self.update_field('complete', False)
2162 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002163
jadmanski0afbb632008-06-06 21:10:57 +00002164 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showarde58e3f82008-11-20 19:04:59 +00002165 'Aborting']:
jadmanski0afbb632008-06-06 21:10:57 +00002166 self.update_field('complete', False)
2167 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002168
showardc85c21b2008-11-24 22:17:37 +00002169 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002170 self.update_field('complete', True)
2171 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002172
2173 should_email_status = (status.lower() in _notify_email_statuses or
2174 'all' in _notify_email_statuses)
2175 if should_email_status:
2176 self._email_on_status(status)
2177
2178 self._email_on_job_complete()
2179
2180
2181 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002182 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002183
2184 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2185 self.job.id, self.job.name, hostname, status)
2186 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2187 self.job.id, self.job.name, hostname, status,
2188 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002189 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002190
2191
2192 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002193 if not self.job.is_finished():
2194 return
showard542e8402008-09-19 20:16:18 +00002195
showardc85c21b2008-11-24 22:17:37 +00002196 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002197 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002198 for queue_entry in hosts_queue:
2199 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002200 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002201 queue_entry.status))
2202
2203 summary_text = "\n".join(summary_text)
2204 status_counts = models.Job.objects.get_status_counts(
2205 [self.job.id])[self.job.id]
2206 status = ', '.join('%d %s' % (count, status) for status, count
2207 in status_counts.iteritems())
2208
2209 subject = 'Autotest: Job ID: %s "%s" %s' % (
2210 self.job.id, self.job.name, status)
2211 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2212 self.job.id, self.job.name, status, self._view_job_url(),
2213 summary_text)
showard170873e2009-01-07 00:22:26 +00002214 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002215
2216
showard89f84db2009-03-12 20:39:13 +00002217 def run(self, assigned_host=None):
2218 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002219 assert assigned_host
2220 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002221 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002222
showardb18134f2009-03-20 20:52:18 +00002223 logging.info("%s/%s/%s scheduled on %s, status=%s",
2224 self.job.name, self.meta_host, self.atomic_group_id,
2225 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002226
jadmanski0afbb632008-06-06 21:10:57 +00002227 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002228
showard6ae5ea92009-02-25 00:11:51 +00002229
jadmanski0afbb632008-06-06 21:10:57 +00002230 def requeue(self):
2231 self.set_status('Queued')
showardde634ee2009-01-30 01:44:24 +00002232 # verify/cleanup failure sets the execution subdir, so reset it here
2233 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002234 if self.meta_host:
2235 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002236
2237
jadmanski0afbb632008-06-06 21:10:57 +00002238 def handle_host_failure(self):
2239 """\
2240 Called when this queue entry's host has failed verification and
2241 repair.
2242 """
2243 assert not self.meta_host
2244 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002245 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002246
2247
jadmanskif7fa2cc2008-10-01 14:13:23 +00002248 @property
2249 def aborted_by(self):
2250 self._load_abort_info()
2251 return self._aborted_by
2252
2253
2254 @property
2255 def aborted_on(self):
2256 self._load_abort_info()
2257 return self._aborted_on
2258
2259
2260 def _load_abort_info(self):
2261 """ Fetch info about who aborted the job. """
2262 if hasattr(self, "_aborted_by"):
2263 return
2264 rows = _db.execute("""
2265 SELECT users.login, aborted_host_queue_entries.aborted_on
2266 FROM aborted_host_queue_entries
2267 INNER JOIN users
2268 ON users.id = aborted_host_queue_entries.aborted_by_id
2269 WHERE aborted_host_queue_entries.queue_entry_id = %s
2270 """, (self.id,))
2271 if rows:
2272 self._aborted_by, self._aborted_on = rows[0]
2273 else:
2274 self._aborted_by = self._aborted_on = None
2275
2276
showardb2e2c322008-10-14 17:33:55 +00002277 def on_pending(self):
2278 """
2279 Called when an entry in a synchronous job has passed verify. If the
2280 job is ready to run, returns an agent to run the job. Returns None
2281 otherwise.
2282 """
2283 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002284 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002285 if self.job.is_ready():
2286 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002287 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002288 return None
2289
2290
showard170873e2009-01-07 00:22:26 +00002291 def abort(self, dispatcher, agents_to_abort=[]):
showard1be97432008-10-17 15:30:45 +00002292 host = self.get_host()
showard9d9ffd52008-11-09 23:14:35 +00002293 if self.active and host:
showard170873e2009-01-07 00:22:26 +00002294 dispatcher.add_agent(Agent(tasks=host.reverify_tasks()))
showard1be97432008-10-17 15:30:45 +00002295
showard170873e2009-01-07 00:22:26 +00002296 abort_task = AbortTask(self, agents_to_abort)
showard1be97432008-10-17 15:30:45 +00002297 self.set_status('Aborting')
showard170873e2009-01-07 00:22:26 +00002298 dispatcher.add_agent(Agent(tasks=[abort_task], num_processes=0))
2299
2300 def execution_tag(self):
2301 assert self.execution_subdir
2302 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002303
2304
mbligh36768f02008-02-22 18:28:33 +00002305class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002306 _table_name = 'jobs'
2307 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2308 'control_type', 'created_on', 'synch_count', 'timeout',
2309 'run_verify', 'email_list', 'reboot_before', 'reboot_after')
2310
2311
showarda3c58572009-03-12 20:36:59 +00002312 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002313 assert id or row
showarda3c58572009-03-12 20:36:59 +00002314 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002315
mblighe2586682008-02-29 22:45:46 +00002316
jadmanski0afbb632008-06-06 21:10:57 +00002317 def is_server_job(self):
2318 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002319
2320
showard170873e2009-01-07 00:22:26 +00002321 def tag(self):
2322 return "%s-%s" % (self.id, self.owner)
2323
2324
jadmanski0afbb632008-06-06 21:10:57 +00002325 def get_host_queue_entries(self):
2326 rows = _db.execute("""
2327 SELECT * FROM host_queue_entries
2328 WHERE job_id= %s
2329 """, (self.id,))
2330 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002331
jadmanski0afbb632008-06-06 21:10:57 +00002332 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002333
jadmanski0afbb632008-06-06 21:10:57 +00002334 return entries
mbligh36768f02008-02-22 18:28:33 +00002335
2336
jadmanski0afbb632008-06-06 21:10:57 +00002337 def set_status(self, status, update_queues=False):
2338 self.update_field('status',status)
2339
2340 if update_queues:
2341 for queue_entry in self.get_host_queue_entries():
2342 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002343
2344
jadmanski0afbb632008-06-06 21:10:57 +00002345 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002346 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2347 status='Pending')
2348 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002349
2350
jadmanski0afbb632008-06-06 21:10:57 +00002351 def num_machines(self, clause = None):
2352 sql = "job_id=%s" % self.id
2353 if clause:
2354 sql += " AND (%s)" % clause
2355 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002356
2357
jadmanski0afbb632008-06-06 21:10:57 +00002358 def num_queued(self):
2359 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002360
2361
jadmanski0afbb632008-06-06 21:10:57 +00002362 def num_active(self):
2363 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002364
2365
jadmanski0afbb632008-06-06 21:10:57 +00002366 def num_complete(self):
2367 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002368
2369
jadmanski0afbb632008-06-06 21:10:57 +00002370 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002371 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002372
mbligh36768f02008-02-22 18:28:33 +00002373
showard6bb7c292009-01-30 01:44:51 +00002374 def _not_yet_run_entries(self, include_verifying=True):
2375 statuses = [models.HostQueueEntry.Status.QUEUED,
2376 models.HostQueueEntry.Status.PENDING]
2377 if include_verifying:
2378 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2379 return models.HostQueueEntry.objects.filter(job=self.id,
2380 status__in=statuses)
2381
2382
2383 def _stop_all_entries(self):
2384 entries_to_stop = self._not_yet_run_entries(
2385 include_verifying=False)
2386 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002387 assert not child_entry.complete, (
2388 '%s status=%s, active=%s, complete=%s' %
2389 (child_entry.id, child_entry.status, child_entry.active,
2390 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002391 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2392 child_entry.host.status = models.Host.Status.READY
2393 child_entry.host.save()
2394 child_entry.status = models.HostQueueEntry.Status.STOPPED
2395 child_entry.save()
2396
showard2bab8f42008-11-12 18:15:22 +00002397 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002398 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002399 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002400 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002401
2402
jadmanski0afbb632008-06-06 21:10:57 +00002403 def write_to_machines_file(self, queue_entry):
2404 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002405 file_path = os.path.join(self.tag(), '.machines')
2406 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002407
2408
showard2bab8f42008-11-12 18:15:22 +00002409 def _next_group_name(self):
2410 query = models.HostQueueEntry.objects.filter(
2411 job=self.id).values('execution_subdir').distinct()
2412 subdirs = (entry['execution_subdir'] for entry in query)
2413 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2414 ids = [int(match.group(1)) for match in groups if match]
2415 if ids:
2416 next_id = max(ids) + 1
2417 else:
2418 next_id = 0
2419 return "group%d" % next_id
2420
2421
showard170873e2009-01-07 00:22:26 +00002422 def _write_control_file(self, execution_tag):
2423 control_path = _drone_manager.attach_file_to_execution(
2424 execution_tag, self.control_file)
2425 return control_path
mbligh36768f02008-02-22 18:28:33 +00002426
showardb2e2c322008-10-14 17:33:55 +00002427
showard2bab8f42008-11-12 18:15:22 +00002428 def get_group_entries(self, queue_entry_from_group):
2429 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002430 return list(HostQueueEntry.fetch(
2431 where='job_id=%s AND execution_subdir=%s',
2432 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002433
2434
showardb2e2c322008-10-14 17:33:55 +00002435 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002436 assert queue_entries
2437 execution_tag = queue_entries[0].execution_tag()
2438 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002439 hostnames = ','.join([entry.get_host().hostname
2440 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002441
showard170873e2009-01-07 00:22:26 +00002442 params = [_autoserv_path, '-P', execution_tag, '-p', '-n',
2443 '-r', _drone_manager.absolute_path(execution_tag),
2444 '-u', self.owner, '-l', self.name, '-m', hostnames,
2445 _drone_manager.absolute_path(control_path)]
mbligh36768f02008-02-22 18:28:33 +00002446
jadmanski0afbb632008-06-06 21:10:57 +00002447 if not self.is_server_job():
2448 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002449
showardb2e2c322008-10-14 17:33:55 +00002450 return params
mblighe2586682008-02-29 22:45:46 +00002451
mbligh36768f02008-02-22 18:28:33 +00002452
showardc9ae1782009-01-30 01:42:37 +00002453 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002454 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002455 return True
showard0fc38302008-10-23 00:44:07 +00002456 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002457 return queue_entry.get_host().dirty
2458 return False
showard21baa452008-10-21 00:08:39 +00002459
showardc9ae1782009-01-30 01:42:37 +00002460
2461 def _should_run_verify(self, queue_entry):
2462 do_not_verify = (queue_entry.host.protection ==
2463 host_protections.Protection.DO_NOT_VERIFY)
2464 if do_not_verify:
2465 return False
2466 return self.run_verify
2467
2468
2469 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002470 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002471 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002472 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002473 if self._should_run_verify(queue_entry):
2474 tasks.append(VerifyTask(queue_entry=queue_entry))
2475 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002476 return tasks
2477
2478
showard2bab8f42008-11-12 18:15:22 +00002479 def _assign_new_group(self, queue_entries):
2480 if len(queue_entries) == 1:
2481 group_name = queue_entries[0].get_host().hostname
2482 else:
2483 group_name = self._next_group_name()
showardb18134f2009-03-20 20:52:18 +00002484 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002485 self.id, [entry.host.hostname for entry in queue_entries],
2486 group_name)
2487
2488 for queue_entry in queue_entries:
2489 queue_entry.set_execution_subdir(group_name)
2490
2491
2492 def _choose_group_to_run(self, include_queue_entry):
2493 chosen_entries = [include_queue_entry]
2494
2495 num_entries_needed = self.synch_count - 1
2496 if num_entries_needed > 0:
2497 pending_entries = HostQueueEntry.fetch(
2498 where='job_id = %s AND status = "Pending" AND id != %s',
2499 params=(self.id, include_queue_entry.id))
2500 chosen_entries += list(pending_entries)[:num_entries_needed]
2501
2502 self._assign_new_group(chosen_entries)
2503 return chosen_entries
2504
2505
2506 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002507 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002508 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2509 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002510
showard2bab8f42008-11-12 18:15:22 +00002511 queue_entries = self._choose_group_to_run(queue_entry)
2512 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002513
2514
2515 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002516 for queue_entry in queue_entries:
2517 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002518 params = self._get_autoserv_params(queue_entries)
2519 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2520 cmd=params)
2521 tasks = initial_tasks + [queue_task]
2522 entry_ids = [entry.id for entry in queue_entries]
2523
showard170873e2009-01-07 00:22:26 +00002524 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002525
2526
mbligh36768f02008-02-22 18:28:33 +00002527if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002528 main()